"""Controller -- owns the obdcore link/registry/store/scheduler for the GUI. Keeps all acquisition concerns out of the widgets. The GUI subscribes/ unsubscribes PIDs (== what's polled == what's plotted) and reads the store on a timer; the scheduler thread does the serial work. """ import time from obdcore import PidRegistry, TimeSeriesStore, PollScheduler, CsvRecorder from obdcore.mock import MockLink # default poll rates (Hz) -- fast for the no-start metrics, slower for the rest FAST = {"ICP", "FICM_M", "RPM"} DEFAULT_HZ = 2 FAST_HZ = 5 class Controller: def __init__(self): self.reg = PidRegistry() self.store = TimeSeriesStore() self.link = None self.sched = None self.t0 = None self.connected = False def connect(self, port=None, baud=38400, mock=False): if mock: self.link = MockLink(clock=time.time) else: from obdcore.link import ElmLink # imported lazily (needs pyserial) self.link = ElmLink(port, baud) self.link.init() ok = self.link.connect() try: self.link.fast_timing(True) except Exception: pass self.sched = PollScheduler(self.link, self.reg, self.store, clock=time.time) self.t0 = time.time() self.connected = True return ok def hz_for(self, key): return FAST_HZ if key in FAST else DEFAULT_HZ def subscribe(self, key): if self.sched: self.sched.subscribe(key, self.hz_for(key)) def unsubscribe(self, key): if self.sched: self.sched.unsubscribe(key) def subscribed(self): return set(self.sched.subscriptions()) if self.sched else set() def start(self): if self.sched: self.sched.start() def record(self, path): self.store.recorder = CsvRecorder(path) def stop_record(self): if self.store.recorder: self.store.recorder.close() self.store.recorder = None def now(self): return (time.time() - self.t0) if self.t0 else 0.0 def stop(self): if self.sched: self.sched.stop() self.sched = None self.stop_record() if self.link: try: self.link.fast_timing(False) except Exception: pass self.link.close() self.link = None self.connected = False