"""Hardware-free tests for the Diagnostics (DTC) one-off command path. The polling thread owns the serial link, so DTC reads/clears can't hit the link directly -- they go through PollScheduler.run_oneoff, which serializes them onto the same thread that does PID reads. These tests prove (a) a one-off returns its callable's value (and re-raises its exceptions), (b) one-offs are drained at the top of tick() without disturbing normal polling, and (c) the same path runs on a live background thread while polling continues. Run: python tests/test_diagnostics.py """ import os import sys import time sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from obdcore import (PidRegistry, TimeSeriesStore, PollScheduler, load_default) from obdcore.mock import MockLink class FakeClock: def __init__(self): self.t = 0.0 def __call__(self): return self.t def advance(self, dt): self.t += dt def _setup(specs): clk = FakeClock() reg = PidRegistry(load_default()) store = TimeSeriesStore() link = MockLink(clock=clk) sch = PollScheduler(link, reg, store, clock=clk) sch.set_subscriptions(specs) return clk, reg, store, link, sch def test_oneoff_returns_value_and_polling_continues(): """Single-threaded (disconnected-style) path: run_oneoff drains inline when no background thread is running, and normal tick() polling is untouched.""" clk, reg, store, link, sch = _setup([("ICP", 5), ("FICM_M", 2)]) # A one-off returns exactly what its callable returns -- here, the MockLink's # stored DTCs (mode 03 -> ["P0148"]). stored = sch.run_oneoff(lambda: link.read_dtcs("03", 0x43)) assert stored == ["P0148"], stored # Pending / permanent are empty on the mock. assert sch.run_oneoff(lambda: link.read_dtcs("07", 0x47)) == [] assert sch.run_oneoff(lambda: link.read_dtcs("0A", 0x4A)) == [] # A clear one-off returns the link's ack. assert sch.run_oneoff(lambda: link.clear_dtcs()) is True # Normal polling still happens through tick(). for _ in range(20): sch.tick() clk.advance(0.05) assert store.latest("ICP") is not None, "ICP should still be polled" assert store.latest("FICM_M") == 48.0 print(" one-off returns value + tick() polling intact: OK") def test_oneoff_exception_propagates(): clk, reg, store, link, sch = _setup([("ICP", 5)]) def boom(): raise ValueError("link blew up") try: sch.run_oneoff(boom) raise AssertionError("expected ValueError to propagate") except ValueError as e: assert "blew up" in str(e) print(" one-off re-raises callable's exception: OK") def test_oneoff_drained_at_top_of_tick(): """Enqueue a job without inline-draining (simulate the running scheduler), then prove a single tick() drains+runs it before doing its PID reads.""" clk, reg, store, link, sch = _setup([("ICP", 5)]) sch._running = True # pretend the thread is alive box = {"ran": False, "ticks_at_run": None} def job(): box["ran"] = True # store is still empty -> the one-off ran before any PID push this tick box["ticks_at_run"] = store.latest("ICP") return "done" import threading # Submit via the public API on a helper thread (it blocks waiting for the # job), so the main thread can drive tick() to drain it. result = {} def submit(): result["val"] = sch.run_oneoff(job, timeout=2.0) t = threading.Thread(target=submit) t.start() # wait until the job is queued, then drain it with one tick() for _ in range(200): if not sch._oneoffs.empty(): break time.sleep(0.001) sch.tick() # drains the one-off, then polls ICP t.join(timeout=2.0) assert box["ran"] is True assert result.get("val") == "done" assert box["ticks_at_run"] is None, "one-off must run before this tick's reads" assert store.latest("ICP") is not None, "ICP polled in the same tick" print(" one-off drained at top of tick(), before PID reads: OK") def test_oneoff_on_live_polling_thread(): """Realistic path: scheduler running on its own thread (real clock) while a one-off DTC read is submitted from the 'GUI' thread and returns its value.""" clk = time.time reg = PidRegistry(load_default()) store = TimeSeriesStore() link = MockLink(clock=clk) sch = PollScheduler(link, reg, store, clock=clk) sch.set_subscriptions([("ICP", 50), ("FICM_M", 20)]) sch.start() try: time.sleep(0.05) # let it poll a few frames stored = sch.run_oneoff(lambda: link.read_dtcs("03", 0x43), timeout=3.0) assert stored == ["P0148"], stored assert sch.run_oneoff(lambda: link.clear_dtcs(), timeout=3.0) is True time.sleep(0.05) finally: sch.stop() assert store.latest("ICP") is not None, "polling ran alongside the one-off" print(" one-off on live polling thread, polling continued: OK") if __name__ == "__main__": for fn in [test_oneoff_returns_value_and_polling_continues, test_oneoff_exception_propagates, test_oneoff_drained_at_top_of_tick, test_oneoff_on_live_polling_thread]: fn() print("\nALL diagnostics TESTS PASS")