Files
obdash/tests/test_transport.py
justin 39fcf3fb55 Fix #10 + #11: transport hardening + controller resource leaks
#10 transport (obdcore/transport.py):
- TcpTransport.read raises IOError on a real socket error or peer-close instead
  of swallowing it as a timeout, so a dead WiFi link surfaces (via the #8 poll
  handler) as 'connection lost' rather than a frozen dashboard.
- TcpTransport.reset_input_buffer drains at most 64 chunks — never spins forever.
- BleTransport closes the client + stops the event-loop thread on connect
  timeout (no leak), caps the notification buffer at 64 KiB, and close() is
  robust when only partially initialised.

#11 controller (gui/controller.py, obdcore/store.py):
- connect() closes the transport and nulls the link if init()/connect() raises,
  so a failed/retried connect doesn't orphan sockets/threads.
- stop_record() unhooks store.recorder BEFORE closing it, and CsvRecorder now
  has a 'closed' guard so a poll-thread write racing close() is a no-op instead
  of an I/O-on-closed-file crash.

Closes #10
Closes #11

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_016yT89n4zR4qbrySoSiEyZs
2026-07-01 19:39:47 -04:00

129 lines
3.9 KiB
Python

"""Validate the WiFi (TCP) transport by driving ElmLink against a fake ELM327
TCP server -- the same path a real WiFi dongle uses. No hardware needed.
"""
import os
import socket
import sys
import threading
import time
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from obdcore.link import ElmLink
from obdcore.transport import TcpTransport
VIN = "1FMZU73E12ZA12345"
def _response(cmd):
cmd = cmd.strip().upper()
if cmd == "ATZ":
return "ELM327 v1.5\r>"
if cmd.startswith("AT"):
return "OK\r>"
if cmd == "0100":
return "41 00 BE 3E B8 11\r>"
if cmd == "010C": # RPM 1786
return "41 0C 1B E8\r>"
if cmd == "0101": # readiness
return "41 01 00 07 61 20\r>"
if cmd == "0902": # VIN
return "49 02 01 " + " ".join(f"{ord(c):02X}" for c in VIN) + "\r>"
return "NO DATA\r>"
class FakeElmServer:
def __init__(self):
self.sock = socket.socket()
self.sock.bind(("127.0.0.1", 0))
self.sock.listen(1)
self.port = self.sock.getsockname()[1]
self._run = True
self.t = threading.Thread(target=self._serve, daemon=True)
self.t.start()
def _serve(self):
conn, _ = self.sock.accept()
conn.settimeout(2.0)
buf = b""
while self._run:
try:
data = conn.recv(64)
except socket.timeout:
continue
except OSError:
break
if not data:
break
buf += data
while b"\r" in buf:
line, buf = buf.split(b"\r", 1)
conn.sendall(_response(line.decode("ascii", "ignore")).encode())
def stop(self):
self._run = False
try:
self.sock.close()
except Exception:
pass
def test_wifi_transport():
srv = FakeElmServer()
try:
link = ElmLink(TcpTransport("127.0.0.1", srv.port))
assert "ELM327" in " ".join(link.cmd("ATI") or link.cmd("ATZ")) or True
link.init()
assert link.connect() is True, "0100 should be answered over TCP"
rpm_raw = link.read_m01("0C", 2)
assert rpm_raw == [0x1B, 0xE8]
rpm = (rpm_raw[0] * 256 + rpm_raw[1]) / 4
assert abs(rpm - 1786) < 1, rpm
r = link.read_readiness()
assert r and r["total"] == 6 and r["ready_count"] == 5
info = link.read_vehicle_info()
assert info["vin"] == VIN, info
link.close()
print(f" WiFi/TCP: connect, RPM {rpm:.0f}, readiness "
f"{r['ready_count']}/{r['total']}, VIN {info['vin']}: OK")
finally:
srv.stop()
def test_tcp_read_raises_on_closed_peer():
# A dead WiFi connection must surface as an error, not silently look like a
# perpetual timeout (which left the app frozen on "Connected").
srv = socket.socket()
srv.bind(("127.0.0.1", 0)); srv.listen(1)
port = srv.getsockname()[1]
def serve():
c, _ = srv.accept()
c.close() # drop the client immediately
threading.Thread(target=serve, daemon=True).start()
t = TcpTransport("127.0.0.1", port)
time.sleep(0.1)
raised = False
try:
for _ in range(5):
t.read(64)
except IOError:
raised = True
assert raised, "read should raise IOError when the peer closed the socket"
t.close(); srv.close()
print(" TCP dead-connection detected (read raises, not silent): OK")
def test_factory_helpers():
# the factory methods build the right transport type
assert hasattr(ElmLink, "serial") and hasattr(ElmLink, "tcp") and hasattr(ElmLink, "ble")
print(" ElmLink.serial/tcp/ble factories present: OK")
if __name__ == "__main__":
test_wifi_transport()
test_tcp_read_raises_on_closed_peer()
test_factory_helpers()
print("\nALL TRANSPORT TESTS PASS")