diff --git a/obdcore/formula.py b/obdcore/formula.py index d33ac46..f7ac300 100644 --- a/obdcore/formula.py +++ b/obdcore/formula.py @@ -1,8 +1,9 @@ """Safe formula evaluator for vehicle-profile PID scaling. Profiles are community-contributed data, so decode formulas must NOT be able to -execute arbitrary code. Formulas are arithmetic expressions over named -variables -- the de-facto OBD convention used by Torque / FORScan / ScanGauge: +execute arbitrary code -- OR exhaust CPU/memory. Formulas are arithmetic +expressions over named variables -- the de-facto OBD convention used by Torque / +FORScan / ScanGauge: raw-mode PIDs: variables A, B, C, ... = response data bytes 0, 1, 2, ... e.g. "(A*256+B)*0.57" "A-40" "(A>>1)&1" "A//2" @@ -10,9 +11,11 @@ variables -- the de-facto OBD convention used by Torque / FORScan / ScanGauge: e.g. "MAP - BARO" Only numeric literals, the named variables, arithmetic/bitwise operators, and a -small whitelist of functions are allowed. No names, attributes, subscripts, -comprehensions, or calls outside the whitelist -- anything else raises -FormulaError at compile time, so a bad/hostile profile fails loudly on load. +small whitelist of functions are allowed. Anything else raises FormulaError at +compile time. To stop a hostile profile from freezing the acquisition thread +with a giant-integer expression (e.g. `9**9**9`, `1<<10**9`), evaluation also +BOUNDS magnitude: shift/exponent amounts and integer result bit-lengths are +capped, and expression length + nesting depth are limited at compile. """ import ast import operator @@ -28,24 +31,33 @@ _UNARY = {ast.USub: operator.neg, ast.UAdd: operator.pos, ast.Invert: operator.i _FUNCS = {"min": min, "max": max, "abs": abs, "round": round, "int": int, "float": float} +# magnitude / complexity limits (far above any real OBD byte arithmetic) +MAX_RESULT_BITS = 512 # ~155 decimal digits; real decode stays < 32 bits +MAX_SHIFT = 256 # bit-field decode never shifts more than a few bytes +MAX_POW_EXP = 64 +MAX_EXPR_LEN = 500 +MAX_DEPTH = 60 + class FormulaError(ValueError): pass -def _validate(node, allowed): +def _validate(node, allowed, depth=0): + if depth > MAX_DEPTH: + raise FormulaError("formula too deeply nested") if isinstance(node, ast.Expression): - return _validate(node.body, allowed) + return _validate(node.body, allowed, depth + 1) if isinstance(node, ast.BinOp): if type(node.op) not in _BIN: raise FormulaError(f"operator not allowed: {type(node.op).__name__}") - _validate(node.left, allowed) - _validate(node.right, allowed) + _validate(node.left, allowed, depth + 1) + _validate(node.right, allowed, depth + 1) return if isinstance(node, ast.UnaryOp): if type(node.op) not in _UNARY: raise FormulaError(f"unary op not allowed: {type(node.op).__name__}") - _validate(node.operand, allowed) + _validate(node.operand, allowed, depth + 1) return if isinstance(node, ast.Constant): if not isinstance(node.value, (int, float)) or isinstance(node.value, bool): @@ -61,16 +73,42 @@ def _validate(node, allowed): if node.keywords: raise FormulaError("keyword args not allowed") for a in node.args: - _validate(a, allowed) + _validate(a, allowed, depth + 1) return raise FormulaError(f"expression not allowed: {type(node).__name__}") +def _apply(op_type, left, right): + """Apply a binary op with magnitude guards so an untrusted formula can't + allocate a giant integer (Pow / shift amplification).""" + if op_type in (ast.LShift, ast.RShift): + try: + r = operator.index(right) + except TypeError: + raise FormulaError("shift amount must be an integer") + if not 0 <= r <= MAX_SHIFT: + raise FormulaError("shift amount out of range") + if op_type is ast.LShift and isinstance(left, int) and \ + left.bit_length() + r > MAX_RESULT_BITS: + raise FormulaError("shift result too large") + elif op_type is ast.Pow: + if isinstance(right, int): + if right > MAX_POW_EXP: + raise FormulaError("exponent too large") + if isinstance(left, int) and right > 0 and \ + left.bit_length() * right > MAX_RESULT_BITS: + raise FormulaError("power result too large") + res = _BIN[op_type](left, right) + if isinstance(res, int) and res.bit_length() > MAX_RESULT_BITS: + raise FormulaError("result magnitude too large") + return res + + def _eval(node, names): if isinstance(node, ast.Expression): return _eval(node.body, names) if isinstance(node, ast.BinOp): - return _BIN[type(node.op)](_eval(node.left, names), _eval(node.right, names)) + return _apply(type(node.op), _eval(node.left, names), _eval(node.right, names)) if isinstance(node, ast.UnaryOp): return _UNARY[type(node.op)](_eval(node.operand, names)) if isinstance(node, ast.Constant): @@ -84,9 +122,11 @@ def _eval(node, names): def compile_formula(expr, allowed_names): """Return fn(names_dict) -> number. Raises FormulaError on disallowed input.""" + if len(expr) > MAX_EXPR_LEN: + raise FormulaError("formula too long") try: tree = ast.parse(expr, mode="eval") - except SyntaxError as e: + except (SyntaxError, ValueError, RecursionError) as e: raise FormulaError(f"bad formula {expr!r}: {e}") allowed = set(allowed_names) _validate(tree, allowed) diff --git a/tests/test_obdcore.py b/tests/test_obdcore.py index 8257580..9233a67 100644 --- a/tests/test_obdcore.py +++ b/tests/test_obdcore.py @@ -63,6 +63,24 @@ def test_formula_is_sandboxed(): print(" formula evaluator rejects code/unknowns: OK") +def test_formula_dos_bounded(): + import time + # giant-integer / resource-exhaustion payloads must be blocked fast, not + # allowed to freeze the polling thread + for bad in ("9**9**9", "1<<10**9", "2**5000", "10**9**9", "A<<100000000", + "A+" * 300 + "A"): + t = time.time() + try: + fn = compile_formula(bad, "AB") + fn({"A": 250, "B": 250}) + raise AssertionError(f"formula not bounded: {bad}") + except FormulaError: + assert time.time() - t < 0.5, f"{bad} was slow to reject" + # legit bit-field / scaling formulas still work + assert compile_formula("(A<<8)|B", "AB")({"A": 1, "B": 2}) == 258 + print(" formula DoS payloads bounded (<0.5s), legit bit ops intact: OK") + + def test_registry_decoders_match_truck_bytes(): reg = PidRegistry(load_default()) cases = { @@ -138,6 +156,7 @@ def test_record_replay_roundtrip(tmp_path=None): if __name__ == "__main__": for fn in [test_profiles_load_and_validate, test_formula_is_sandboxed, + test_formula_dos_bounded, test_registry_decoders_match_truck_bytes, test_crank_ramp_and_peak, test_derived_boost_channel, test_dead_pid_parks_and_revives, test_record_replay_roundtrip]: