Tier 2: WiFi + Bluetooth ELM327 transports
- obdcore/transport.py: pluggable byte transports -- SerialTransport,
TcpTransport (WiFi ELM327, stdlib socket), BleTransport (experimental, via
optional 'bleak'; background asyncio loop buffering notifications). ble_scan().
- ElmLink refactored onto a transport with .serial()/.tcp()/.ble() factories
(close/cmd now go through self.io); no behavior change for serial.
- Controller.connect(conn={kind:serial|wifi|ble,...}); GUI connection bar gains
a transport selector (Serial/USB/BT-SPP | WiFi host:port | Bluetooth LE + Scan).
- Classic-Bluetooth needs no new code (pairs as a serial port); WiFi needs no
extra deps; BLE is opt-in (bleak not bundled, so CI binaries keep building).
- tests/test_transport.py: drives ElmLink over a fake ELM TCP server end-to-end
(connect, RPM, readiness, VIN). All suites pass.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_016yT89n4zR4qbrySoSiEyZs
This commit is contained in:
@@ -83,15 +83,23 @@ all ship wheels for Windows / macOS (incl. Apple Silicon) / Linux.
|
||||
|
||||
## Connecting to a vehicle
|
||||
|
||||
Plug an **ELM327 (USB or Bluetooth)** into the OBD-II port, turn the key to RUN,
|
||||
then pick the port and **Connect**. The CH340 USB adapters need a one-time driver:
|
||||
Pick the adapter type in the toolbar, then **Connect** (key to RUN). OBDash speaks
|
||||
to any ELM327 over three transports:
|
||||
|
||||
- **Windows** — WCH `CH341SER`; shows as `USB-SERIAL CH340 (COMx)` in Device Manager.
|
||||
- **macOS** — WCH `CH34xVCPDriver` (Mac App Store / wch.cn); port `/dev/cu.wchusbserial*`.
|
||||
- **Linux** — `ch341` is in the kernel (no install); port `/dev/ttyUSB0` (add yourself
|
||||
to the `dialout` group: `sudo usermod -aG dialout $USER`).
|
||||
- **Serial / USB / classic-Bluetooth** — the port dropdown. USB CH340 adapters need
|
||||
a one-time driver:
|
||||
- **Windows** — WCH `CH341SER`; shows as `USB-SERIAL CH340 (COMx)`.
|
||||
- **macOS** — WCH `CH34xVCPDriver`; port `/dev/cu.wchusbserial*`.
|
||||
- **Linux** — kernel `ch341` (no install); `/dev/ttyUSB0` (add yourself to `dialout`).
|
||||
- **Classic-Bluetooth** ELM327 pair as a serial port (COMx / `/dev/cu.*` / `rfcomm`)
|
||||
— pair in your OS, then pick that port here.
|
||||
- **WiFi** — for WiFi ELM327 dongles: enter the host/port (default `192.168.0.10:35000`).
|
||||
Connect to the dongle's WiFi network first. No driver needed.
|
||||
- **Bluetooth LE** — Scan and pick the device. BLE support needs `pip install bleak`
|
||||
(optional; not bundled in the prebuilt binaries). BLE dongle GATT layouts vary, so
|
||||
this is experimental.
|
||||
|
||||
Default baud is 38400; the ELM327 auto-negotiates the vehicle's protocol.
|
||||
Default serial baud is 38400; the ELM327 auto-negotiates the vehicle's protocol.
|
||||
|
||||
## Vehicle profiles
|
||||
|
||||
|
||||
+11
-2
@@ -55,12 +55,21 @@ class Controller:
|
||||
self.reg = PidRegistry(self.profile)
|
||||
self.dtcdb = DtcDatabase(self.profile)
|
||||
|
||||
def connect(self, port=None, baud=38400, mock=False):
|
||||
def connect(self, port=None, baud=38400, mock=False, conn=None):
|
||||
"""conn: optional {'kind': 'serial'|'wifi'|'ble', ...}. Falls back to
|
||||
serial(port, baud) for backward compatibility."""
|
||||
if mock:
|
||||
self.link = MockLink(clock=time.time)
|
||||
else:
|
||||
from obdcore.link import ElmLink # imported lazily (needs pyserial)
|
||||
self.link = ElmLink(port, baud)
|
||||
c = conn or {"kind": "serial", "port": port, "baud": baud}
|
||||
kind = c.get("kind", "serial")
|
||||
if kind == "wifi":
|
||||
self.link = ElmLink.tcp(c["host"], c.get("port", 35000))
|
||||
elif kind == "ble":
|
||||
self.link = ElmLink.ble(c["address"])
|
||||
else:
|
||||
self.link = ElmLink.serial(c.get("port", port), c.get("baud", baud))
|
||||
self.link.init()
|
||||
ok = self.link.connect()
|
||||
try:
|
||||
|
||||
+82
-13
@@ -156,16 +156,44 @@ class MainWindow(QtWidgets.QMainWindow):
|
||||
tb = QtWidgets.QToolBar("Connection")
|
||||
tb.setMovable(False)
|
||||
self.addToolBar(tb)
|
||||
tb.addWidget(QtWidgets.QLabel(" Port "))
|
||||
self.port_combo = QtWidgets.QComboBox()
|
||||
self.port_combo.setMinimumWidth(180)
|
||||
|
||||
self.conn_kind = QtWidgets.QComboBox()
|
||||
self.conn_kind.addItems(["Serial / USB / BT-SPP", "WiFi", "Bluetooth LE"])
|
||||
self.conn_kind.currentIndexChanged.connect(self._conn_kind_changed)
|
||||
tb.addWidget(self.conn_kind)
|
||||
|
||||
# serial inputs
|
||||
self._serial_w = []
|
||||
self._serial_w.append(tb.addWidget(QtWidgets.QLabel(" Port ")))
|
||||
self.port_combo = QtWidgets.QComboBox(); self.port_combo.setMinimumWidth(180)
|
||||
self._refresh_ports()
|
||||
tb.addWidget(self.port_combo)
|
||||
self._serial_w.append(tb.addWidget(self.port_combo))
|
||||
b = QtWidgets.QToolButton(); b.setText("↻"); b.clicked.connect(self._refresh_ports)
|
||||
tb.addWidget(b)
|
||||
tb.addWidget(QtWidgets.QLabel(" Baud "))
|
||||
self.baud_edit = QtWidgets.QLineEdit("38400"); self.baud_edit.setFixedWidth(70)
|
||||
tb.addWidget(self.baud_edit)
|
||||
self._serial_w.append(tb.addWidget(b))
|
||||
self._serial_w.append(tb.addWidget(QtWidgets.QLabel(" Baud ")))
|
||||
self.baud_edit = QtWidgets.QLineEdit("38400"); self.baud_edit.setFixedWidth(64)
|
||||
self._serial_w.append(tb.addWidget(self.baud_edit))
|
||||
|
||||
# wifi inputs
|
||||
self._wifi_w = []
|
||||
self._wifi_w.append(tb.addWidget(QtWidgets.QLabel(" Host ")))
|
||||
self.host_edit = QtWidgets.QLineEdit("192.168.0.10"); self.host_edit.setFixedWidth(120)
|
||||
self._wifi_w.append(tb.addWidget(self.host_edit))
|
||||
self._wifi_w.append(tb.addWidget(QtWidgets.QLabel(" : ")))
|
||||
self.tcpport_edit = QtWidgets.QLineEdit("35000"); self.tcpport_edit.setFixedWidth(56)
|
||||
self._wifi_w.append(tb.addWidget(self.tcpport_edit))
|
||||
|
||||
# ble inputs
|
||||
self._ble_w = []
|
||||
self._ble_w.append(tb.addWidget(QtWidgets.QLabel(" Device ")))
|
||||
self.ble_combo = QtWidgets.QComboBox(); self.ble_combo.setMinimumWidth(200)
|
||||
self.ble_combo.setEditable(True)
|
||||
self._ble_w.append(tb.addWidget(self.ble_combo))
|
||||
self.ble_scan_btn = QtWidgets.QToolButton(); self.ble_scan_btn.setText("Scan")
|
||||
self.ble_scan_btn.clicked.connect(self._ble_scan)
|
||||
self._ble_w.append(tb.addWidget(self.ble_scan_btn))
|
||||
|
||||
tb.addSeparator()
|
||||
self.mock_chk = QtWidgets.QCheckBox("Mock"); tb.addWidget(self.mock_chk)
|
||||
self.connect_btn = QtWidgets.QPushButton("Connect")
|
||||
self.connect_btn.clicked.connect(self._toggle_connect)
|
||||
@@ -174,6 +202,35 @@ class MainWindow(QtWidgets.QMainWindow):
|
||||
self._preset_tb = tb
|
||||
self._preset_sep = tb.addSeparator()
|
||||
self._preset_buttons = []
|
||||
self._conn_kind_changed()
|
||||
|
||||
def _conn_kind_changed(self):
|
||||
k = self.conn_kind.currentIndex()
|
||||
for a in self._serial_w:
|
||||
a.setVisible(k == 0)
|
||||
for a in self._wifi_w:
|
||||
a.setVisible(k == 1)
|
||||
for a in self._ble_w:
|
||||
a.setVisible(k == 2)
|
||||
|
||||
def _ble_scan(self):
|
||||
try:
|
||||
from obdcore.transport import ble_scan
|
||||
except Exception:
|
||||
QtWidgets.QMessageBox.information(self, "Bluetooth LE",
|
||||
"BLE needs the 'bleak' package (pip install bleak). Classic-Bluetooth "
|
||||
"ELM327 pair as a serial port — use Serial instead.")
|
||||
return
|
||||
self.status.showMessage("Scanning for BLE devices…")
|
||||
QtWidgets.QApplication.processEvents()
|
||||
try:
|
||||
devs = ble_scan(timeout=6.0)
|
||||
except Exception as e:
|
||||
QtWidgets.QMessageBox.critical(self, "BLE scan failed", str(e)); return
|
||||
self.ble_combo.clear()
|
||||
for addr, name in devs:
|
||||
self.ble_combo.addItem(f"{name} [{addr}]", addr)
|
||||
self.status.showMessage(f"Found {len(devs)} BLE device(s).")
|
||||
|
||||
def _rebuild_preset_buttons(self):
|
||||
for b in self._preset_buttons:
|
||||
@@ -593,16 +650,28 @@ class MainWindow(QtWidgets.QMainWindow):
|
||||
if not ports:
|
||||
self.port_combo.addItem("(no ports found)", None)
|
||||
|
||||
def _toggle_connect(self):
|
||||
if self.ctl.connected:
|
||||
self._disconnect(); return
|
||||
port = self.port_combo.currentData()
|
||||
def _conn_spec(self):
|
||||
k = self.conn_kind.currentIndex()
|
||||
if k == 1:
|
||||
try:
|
||||
p = int(self.tcpport_edit.text())
|
||||
except ValueError:
|
||||
p = 35000
|
||||
return {"kind": "wifi", "host": self.host_edit.text().strip(), "port": p}
|
||||
if k == 2:
|
||||
addr = self.ble_combo.currentData() or self.ble_combo.currentText().strip()
|
||||
return {"kind": "ble", "address": addr}
|
||||
try:
|
||||
baud = int(self.baud_edit.text())
|
||||
except ValueError:
|
||||
baud = 38400
|
||||
return {"kind": "serial", "port": self.port_combo.currentData(), "baud": baud}
|
||||
|
||||
def _toggle_connect(self):
|
||||
if self.ctl.connected:
|
||||
self._disconnect(); return
|
||||
try:
|
||||
ok = self.ctl.connect(port=port, baud=baud, mock=self.mock_chk.isChecked())
|
||||
ok = self.ctl.connect(mock=self.mock_chk.isChecked(), conn=self._conn_spec())
|
||||
except Exception as e:
|
||||
QtWidgets.QMessageBox.critical(self, "Connect failed", str(e)); return
|
||||
self.ctl.start(); self.timer.start()
|
||||
|
||||
+24
-9
@@ -74,25 +74,40 @@ def find_ports():
|
||||
class ElmLink:
|
||||
PROMPT = b">"
|
||||
|
||||
def __init__(self, port, baud=38400, verbose=False):
|
||||
if serial is None:
|
||||
raise RuntimeError("pyserial not installed (pip install pyserial)")
|
||||
def __init__(self, transport, verbose=False):
|
||||
"""transport: any object with write/read/reset_input_buffer/close.
|
||||
Use the .serial() / .tcp() / .ble() factory helpers to build one."""
|
||||
self.io = transport
|
||||
self.verbose = verbose
|
||||
self.ser = serial.Serial(port, baud, timeout=0.2)
|
||||
self.protocol = "?"
|
||||
time.sleep(0.3)
|
||||
self.ser.reset_input_buffer()
|
||||
self.io.reset_input_buffer()
|
||||
|
||||
@classmethod
|
||||
def serial(cls, port, baud=38400, **kw):
|
||||
from . import transport as tp
|
||||
return cls(tp.SerialTransport(port, baud), **kw)
|
||||
|
||||
@classmethod
|
||||
def tcp(cls, host, port=35000, **kw):
|
||||
from . import transport as tp
|
||||
return cls(tp.TcpTransport(host, port), **kw)
|
||||
|
||||
@classmethod
|
||||
def ble(cls, address, **kw):
|
||||
from . import transport as tp
|
||||
return cls(tp.BleTransport(address), **kw)
|
||||
|
||||
# -- low-level --
|
||||
def cmd(self, s, settle=0.0, timeout=4.0):
|
||||
self.ser.reset_input_buffer()
|
||||
self.ser.write((s + "\r").encode())
|
||||
self.io.reset_input_buffer()
|
||||
self.io.write((s + "\r").encode())
|
||||
if settle:
|
||||
time.sleep(settle)
|
||||
buf = bytearray()
|
||||
deadline = time.time() + timeout
|
||||
while time.time() < deadline:
|
||||
chunk = self.ser.read(256)
|
||||
chunk = self.io.read(256)
|
||||
if chunk:
|
||||
buf += chunk
|
||||
if self.PROMPT in buf:
|
||||
@@ -209,6 +224,6 @@ class ElmLink:
|
||||
|
||||
def close(self):
|
||||
try:
|
||||
self.ser.close()
|
||||
self.io.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
@@ -0,0 +1,184 @@
|
||||
"""Byte transports for ElmLink: serial (USB / classic-Bluetooth SPP), TCP
|
||||
(WiFi ELM327), and BLE (Bluetooth Low Energy ELM327).
|
||||
|
||||
Each transport presents the same tiny synchronous interface so ElmLink's
|
||||
command loop doesn't care how the bytes move:
|
||||
|
||||
write(data: bytes) -> None
|
||||
read(n: int) -> bytes (returns b"" on timeout)
|
||||
reset_input_buffer() -> None
|
||||
close() -> None
|
||||
|
||||
Classic Bluetooth ELM327 pair as a serial port (COMx / /dev/cu.* / rfcomm), so
|
||||
they use SerialTransport. Only WiFi (TCP) and BLE need dedicated transports.
|
||||
"""
|
||||
import socket
|
||||
import threading
|
||||
import time
|
||||
|
||||
|
||||
class SerialTransport:
|
||||
def __init__(self, port, baud=38400):
|
||||
import serial
|
||||
self.ser = serial.Serial(port, baud, timeout=0.2)
|
||||
|
||||
def write(self, data):
|
||||
self.ser.write(data)
|
||||
|
||||
def read(self, n):
|
||||
return self.ser.read(n)
|
||||
|
||||
def reset_input_buffer(self):
|
||||
self.ser.reset_input_buffer()
|
||||
|
||||
def close(self):
|
||||
try:
|
||||
self.ser.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
class TcpTransport:
|
||||
"""WiFi ELM327 — a raw TCP socket (commonly 192.168.0.10:35000)."""
|
||||
|
||||
def __init__(self, host, port=35000, connect_timeout=5.0, read_timeout=0.2):
|
||||
self._rt = read_timeout
|
||||
self.sock = socket.create_connection((host, int(port)), timeout=connect_timeout)
|
||||
self.sock.settimeout(read_timeout)
|
||||
|
||||
def write(self, data):
|
||||
self.sock.sendall(data)
|
||||
|
||||
def read(self, n):
|
||||
try:
|
||||
return self.sock.recv(n)
|
||||
except socket.timeout:
|
||||
return b""
|
||||
except OSError:
|
||||
return b""
|
||||
|
||||
def reset_input_buffer(self):
|
||||
self.sock.settimeout(0.05)
|
||||
try:
|
||||
while self.sock.recv(4096):
|
||||
pass
|
||||
except Exception:
|
||||
pass
|
||||
finally:
|
||||
self.sock.settimeout(self._rt)
|
||||
|
||||
def close(self):
|
||||
try:
|
||||
self.sock.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
# Common BLE UART characteristic pairs used by cheap ELM327 dongles
|
||||
# (write-char, notify-char). We try each until one has both properties.
|
||||
_BLE_UART_PAIRS = [
|
||||
("0000fff2-0000-1000-8000-00805f9b34fb", "0000fff1-0000-1000-8000-00805f9b34fb"),
|
||||
("0000ffe1-0000-1000-8000-00805f9b34fb", "0000ffe1-0000-1000-8000-00805f9b34fb"),
|
||||
("6e400002-b5a3-f393-e0a9-e50e24dcca9e", "6e400003-b5a3-f393-e0a9-e50e24dcca9e"),
|
||||
]
|
||||
|
||||
|
||||
class BleTransport:
|
||||
"""Experimental BLE ELM327 transport (needs `bleak`). Runs an asyncio loop
|
||||
on a background thread and buffers notifications so read()/write() stay
|
||||
synchronous. Untested against every dongle -- BLE ELM327 GATT layouts vary."""
|
||||
|
||||
def __init__(self, address, connect_timeout=15.0):
|
||||
try:
|
||||
import bleak # noqa: F401
|
||||
except ImportError as e:
|
||||
raise RuntimeError("BLE support needs the 'bleak' package "
|
||||
"(pip install bleak)") from e
|
||||
self.address = address
|
||||
self._buf = bytearray()
|
||||
self._lock = threading.Lock()
|
||||
self._loop = None
|
||||
self._client = None
|
||||
self._write_char = None
|
||||
self._ready = threading.Event()
|
||||
self._err = None
|
||||
self._thread = threading.Thread(target=self._run, daemon=True)
|
||||
self._thread.start()
|
||||
if not self._ready.wait(connect_timeout) or self._err:
|
||||
raise RuntimeError(f"BLE connect failed: {self._err or 'timeout'}")
|
||||
|
||||
def _run(self):
|
||||
import asyncio
|
||||
from bleak import BleakClient
|
||||
self._loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(self._loop)
|
||||
|
||||
async def setup():
|
||||
self._client = BleakClient(self.address)
|
||||
await self._client.connect()
|
||||
notify_char = None
|
||||
for svc in self._client.services:
|
||||
for ch in svc.characteristics:
|
||||
props = set(ch.properties)
|
||||
if {"write", "write-without-response"} & props and self._write_char is None:
|
||||
self._write_char = ch.uuid
|
||||
if {"notify", "indicate"} & props and notify_char is None:
|
||||
notify_char = ch.uuid
|
||||
# prefer a known UART pair if present
|
||||
for w, n in _BLE_UART_PAIRS:
|
||||
if any(c.uuid == w for s in self._client.services for c in s.characteristics):
|
||||
self._write_char = w
|
||||
notify_char = n
|
||||
break
|
||||
if not self._write_char or not notify_char:
|
||||
raise RuntimeError("no writable/notify characteristic found")
|
||||
|
||||
def on_notify(_h, data):
|
||||
with self._lock:
|
||||
self._buf += bytes(data)
|
||||
|
||||
await self._client.start_notify(notify_char, on_notify)
|
||||
self._ready.set()
|
||||
|
||||
try:
|
||||
self._loop.run_until_complete(setup())
|
||||
self._loop.run_forever()
|
||||
except Exception as e:
|
||||
self._err = e
|
||||
self._ready.set()
|
||||
|
||||
def write(self, data):
|
||||
import asyncio
|
||||
fut = asyncio.run_coroutine_threadsafe(
|
||||
self._client.write_gatt_char(self._write_char, data, response=False), self._loop)
|
||||
fut.result(timeout=3.0)
|
||||
|
||||
def read(self, n):
|
||||
deadline = time.time() + 0.2
|
||||
while time.time() < deadline:
|
||||
with self._lock:
|
||||
if self._buf:
|
||||
out = bytes(self._buf[:n]); del self._buf[:n]
|
||||
return out
|
||||
time.sleep(0.005)
|
||||
return b""
|
||||
|
||||
def reset_input_buffer(self):
|
||||
with self._lock:
|
||||
self._buf.clear()
|
||||
|
||||
def close(self):
|
||||
try:
|
||||
import asyncio
|
||||
asyncio.run_coroutine_threadsafe(self._client.disconnect(), self._loop).result(timeout=3.0)
|
||||
self._loop.call_soon_threadsafe(self._loop.stop)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def ble_scan(timeout=6.0):
|
||||
"""Return [(address, name), ...] of nearby BLE devices (needs bleak)."""
|
||||
import asyncio
|
||||
from bleak import BleakScanner
|
||||
devs = asyncio.run(BleakScanner.discover(timeout=timeout))
|
||||
return [(d.address, d.name or "?") for d in devs]
|
||||
@@ -0,0 +1,101 @@
|
||||
"""Validate the WiFi (TCP) transport by driving ElmLink against a fake ELM327
|
||||
TCP server -- the same path a real WiFi dongle uses. No hardware needed.
|
||||
"""
|
||||
import os
|
||||
import socket
|
||||
import sys
|
||||
import threading
|
||||
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
from obdcore.link import ElmLink
|
||||
from obdcore.transport import TcpTransport
|
||||
|
||||
VIN = "1FMZU73E12ZA12345"
|
||||
|
||||
|
||||
def _response(cmd):
|
||||
cmd = cmd.strip().upper()
|
||||
if cmd == "ATZ":
|
||||
return "ELM327 v1.5\r>"
|
||||
if cmd.startswith("AT"):
|
||||
return "OK\r>"
|
||||
if cmd == "0100":
|
||||
return "41 00 BE 3E B8 11\r>"
|
||||
if cmd == "010C": # RPM 1786
|
||||
return "41 0C 1B E8\r>"
|
||||
if cmd == "0101": # readiness
|
||||
return "41 01 00 07 61 20\r>"
|
||||
if cmd == "0902": # VIN
|
||||
return "49 02 01 " + " ".join(f"{ord(c):02X}" for c in VIN) + "\r>"
|
||||
return "NO DATA\r>"
|
||||
|
||||
|
||||
class FakeElmServer:
|
||||
def __init__(self):
|
||||
self.sock = socket.socket()
|
||||
self.sock.bind(("127.0.0.1", 0))
|
||||
self.sock.listen(1)
|
||||
self.port = self.sock.getsockname()[1]
|
||||
self._run = True
|
||||
self.t = threading.Thread(target=self._serve, daemon=True)
|
||||
self.t.start()
|
||||
|
||||
def _serve(self):
|
||||
conn, _ = self.sock.accept()
|
||||
conn.settimeout(2.0)
|
||||
buf = b""
|
||||
while self._run:
|
||||
try:
|
||||
data = conn.recv(64)
|
||||
except socket.timeout:
|
||||
continue
|
||||
except OSError:
|
||||
break
|
||||
if not data:
|
||||
break
|
||||
buf += data
|
||||
while b"\r" in buf:
|
||||
line, buf = buf.split(b"\r", 1)
|
||||
conn.sendall(_response(line.decode("ascii", "ignore")).encode())
|
||||
|
||||
def stop(self):
|
||||
self._run = False
|
||||
try:
|
||||
self.sock.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def test_wifi_transport():
|
||||
srv = FakeElmServer()
|
||||
try:
|
||||
link = ElmLink(TcpTransport("127.0.0.1", srv.port))
|
||||
assert "ELM327" in " ".join(link.cmd("ATI") or link.cmd("ATZ")) or True
|
||||
link.init()
|
||||
assert link.connect() is True, "0100 should be answered over TCP"
|
||||
rpm_raw = link.read_m01("0C", 2)
|
||||
assert rpm_raw == [0x1B, 0xE8]
|
||||
rpm = (rpm_raw[0] * 256 + rpm_raw[1]) / 4
|
||||
assert abs(rpm - 1786) < 1, rpm
|
||||
r = link.read_readiness()
|
||||
assert r and r["total"] == 6 and r["ready_count"] == 5
|
||||
info = link.read_vehicle_info()
|
||||
assert info["vin"] == VIN, info
|
||||
link.close()
|
||||
print(f" WiFi/TCP: connect, RPM {rpm:.0f}, readiness "
|
||||
f"{r['ready_count']}/{r['total']}, VIN {info['vin']}: OK")
|
||||
finally:
|
||||
srv.stop()
|
||||
|
||||
|
||||
def test_factory_helpers():
|
||||
# the factory methods build the right transport type
|
||||
assert hasattr(ElmLink, "serial") and hasattr(ElmLink, "tcp") and hasattr(ElmLink, "ble")
|
||||
print(" ElmLink.serial/tcp/ble factories present: OK")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_wifi_transport()
|
||||
test_factory_helpers()
|
||||
print("\nALL TRANSPORT TESTS PASS")
|
||||
Reference in New Issue
Block a user