Files
obdash/tests/test_obdcore.py
justin 0f029b724a Fix #6: bound formula evaluation to stop untrusted-profile DoS
The AST sandbox whitelisted ** and << with no magnitude bound, so a hostile
profile formula (9**9**9, 1<<10**9) computed a multi-hundred-MB integer on the
scheduler thread -> CPU pin + OOM. The scheduler except clause never catches a
runaway/OOM (not a raised exception), and a derived PID with empty deps fires
every tick on connect.

- _apply() guards each BinOp: shift amount <= 256, exponent <= 64, and any int
  result bit_length > 512 raises FormulaError (caught by the scheduler -> sample
  dropped, thread survives).
- compile-time caps: expr length <= 500, AST depth <= 60; parse also catches
  RecursionError.
- test_formula_dos_bounded: giant-int payloads rejected in <0.5s; legit bit ops
  and scaling still work.

Closes #6

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_016yT89n4zR4qbrySoSiEyZs
2026-07-01 19:26:05 -04:00

165 lines
5.9 KiB
Python

"""Hardware-free tests for the obdcore acquisition engine.
Drives the scheduler deterministically with a fake clock + MockLink, so the
prioritized polling, derived channels, dead-PID parking, store min/max, and
record/replay are all validated without a truck or serial port.
Run: python -m pytest tests/ -q (or) python tests/test_obdcore.py
"""
import os
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from obdcore import (PidRegistry, TimeSeriesStore, PollScheduler, CsvRecorder,
replay_csv, load_default, load_profile, default_profile_path,
list_profiles, compile_formula, FormulaError)
from obdcore.mock import MockLink
class FakeClock:
def __init__(self):
self.t = 0.0
def __call__(self):
return self.t
def advance(self, dt):
self.t += dt
def _setup(specs):
clk = FakeClock()
reg = PidRegistry(load_default())
store = TimeSeriesStore()
link = MockLink(clock=clk)
sch = PollScheduler(link, reg, store, clock=clk)
sch.set_subscriptions(specs)
return clk, reg, store, sch
def test_profiles_load_and_validate():
profs = list_profiles()
assert any("ford-6.0" in p for p, _ in profs), "ford profile should be listed"
for path, meta in profs:
prof = load_profile(path) # compiles every formula -> raises if bad
assert prof.meta.get("name")
assert all(p.decode or p.mode == "atrv" for p in prof.pids)
print(f" {len(profs)} profiles load + compile clean: OK")
def test_formula_is_sandboxed():
# legit
fn = compile_formula("(A*256+B)*0.57", "ABCDEFGH")
assert abs(fn({"A": 0, "B": 22}) - 12.54) < 0.01
# hostile / disallowed -> rejected at compile
for bad in ("__import__('os').system('x')", "open('/etc/passwd')",
"A.__class__", "Z+1", "A if B else C"):
try:
compile_formula(bad, "ABC")
raise AssertionError(f"should have rejected: {bad}")
except FormulaError:
pass
print(" formula evaluator rejects code/unknowns: OK")
def test_formula_dos_bounded():
import time
# giant-integer / resource-exhaustion payloads must be blocked fast, not
# allowed to freeze the polling thread
for bad in ("9**9**9", "1<<10**9", "2**5000", "10**9**9", "A<<100000000",
"A+" * 300 + "A"):
t = time.time()
try:
fn = compile_formula(bad, "AB")
fn({"A": 250, "B": 250})
raise AssertionError(f"formula not bounded: {bad}")
except FormulaError:
assert time.time() - t < 0.5, f"{bad} was slow to reject"
# legit bit-field / scaling formulas still work
assert compile_formula("(A<<8)|B", "AB")({"A": 1, "B": 2}) == 258
print(" formula DoS payloads bounded (<0.5s), legit bit ops intact: OK")
def test_registry_decoders_match_truck_bytes():
reg = PidRegistry(load_default())
cases = {
"ICP": ([0x00, 0x16], 12.5), "EBP": ([0x01, 0x8F], 14.46),
"MAP": ([0x01, 0x89], 14.25), "BARO": ([0x01, 0x88], 14.21),
"EOT": ([0x1C, 0x92], 33.1), "GEAR": ([0x02], 1),
"FICM_M": ([0x30, 0x00], 48.0),
}
for key, (raw, expect) in cases.items():
got = reg.get(key).decode(raw)
assert abs(got - expect) < 0.05, f"{key}: {got} != {expect}"
print(" decoders match truck bytes: OK")
def test_crank_ramp_and_peak():
clk, reg, store, sch = _setup([("ICP", 5), ("FICM_M", 2), ("BATT", 2), ("RPM", 2)])
for _ in range(60): # ~3s at 50ms steps
sch.tick()
clk.advance(0.05)
icp = store.channel("ICP")
lo, hi = store.minmax("ICP")
assert hi >= 500, f"ICP should ramp past 500, got peak {hi}"
assert lo is not None and lo < 50, "ICP should start low"
assert store.latest("FICM_M") == 48.0
bat_lo, _ = store.minmax("BATT")
assert bat_lo <= 10.7, f"battery sag should be captured, got {bat_lo}"
print(f" crank ramp: ICP {lo} -> peak {hi}, FICM {store.latest('FICM_M')}V, "
f"batt min {bat_lo}V: OK")
def test_derived_boost_channel():
clk, reg, store, sch = _setup([("MAP", 5), ("BARO", 5), ("BOOST", 5)])
for _ in range(10):
sch.tick()
clk.advance(0.05)
boost = store.latest("BOOST")
assert boost is not None and abs(boost) < 0.5, f"atm boost ~0, got {boost}"
print(f" derived BOOST = MAP-BARO = {boost} psi: OK")
def test_dead_pid_parks_and_revives():
clk, reg, store, sch = _setup([("ICP", 5), ("IPR", 5)]) # IPR not in MockLink -> dead
for _ in range(20):
sch.tick()
clk.advance(0.05)
# IPR should have parked (4 fails) but scheduler still runs ICP
assert store.latest("ICP") is not None
assert store.latest("IPR") is None
# advance past revive window -> IPR re-attempted (still None, but tried)
clk.advance(6.0)
sch.tick()
print(" dead-PID parking + revive: OK")
def test_record_replay_roundtrip(tmp_path=None):
import tempfile
path = os.path.join(tempfile.gettempdir(), "obdcore_replay_test.csv")
clk, reg, store, sch = _setup([("ICP", 5), ("FICM_M", 5)])
store.recorder = CsvRecorder(path)
for _ in range(20):
sch.tick()
clk.advance(0.05)
store.recorder.close()
peak_orig = store.minmax("ICP")[1]
store2 = TimeSeriesStore()
replay_csv(path, store2)
peak_replay = store2.minmax("ICP")[1]
assert abs(peak_orig - peak_replay) < 0.01, "replay should reproduce peak ICP"
print(f" record/replay: peak ICP {peak_orig} == replay {peak_replay}: OK")
os.remove(path)
if __name__ == "__main__":
for fn in [test_profiles_load_and_validate, test_formula_is_sandboxed,
test_formula_dos_bounded,
test_registry_decoders_match_truck_bytes, test_crank_ramp_and_peak,
test_derived_boost_channel, test_dead_pid_parks_and_revives,
test_record_replay_roundtrip]:
fn()
print("\nALL obdcore TESTS PASS")