Files
xterra-overland-dashboard/tests/test_pico_core.py
T
2026-06-03 02:40:17 -06:00

231 lines
6.1 KiB
Python

import sys
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
PICO = ROOT / "pico-dashboard"
sys.path.insert(0, str(PICO))
from comms.protocol import make_status_request, make_set_relay
from state.app_state import AppState
from alarms.alarm_manager import AlarmManager
from ui.screen_manager import ScreenManager
def test_protocol_messages():
assert make_status_request() == {"type": "status_request"}
assert make_set_relay("starlink", True) == {
"type": "set_relay",
"relay": "starlink",
"enabled": True,
}
def test_app_state_status_update():
state = AppState()
state.update_from_status({
"timestamp": 123,
"battery": {"soc": 82, "voltage": 13.2},
"temps": {"fridge_zone_1": 34.5},
"sensor_health": {"fridge_zone_1": True},
"relays": {"starlink": True},
"vehicle": {"ignition_on": False},
"network": {"uart_connected": True},
})
assert state.last_status_timestamp == 123
assert state.battery["soc"] == 82
assert state.temps["fridge_zone_1"] == 34.5
assert state.sensor_health["fridge_zone_1"] is True
assert state.relays["starlink"] is True
assert state.network["uart_connected"] is True
def test_alarm_manager():
state = AppState()
state.update_from_status({
"battery": {"soc": 10, "voltage": 11.8},
"temps": {"fridge_zone_1": 45.0, "fridge_zone_2": 35.0},
"sensor_health": {
"fridge_zone_1": True,
"fridge_zone_2": True,
"rear_seat": False,
"outside": True,
},
"network": {"uart_connected": False},
})
alarms = AlarmManager().evaluate(state)
assert "battery_soc_low" in alarms
assert "battery_voltage_low" in alarms
assert "fridge_zone_1_high" in alarms
assert "communication_lost" in alarms
assert "sensor_failure:rear_seat" in alarms
def test_screen_manager():
screens = ScreenManager()
assert screens.current_screen == "dashboard"
screens.go_to("battery")
assert screens.current_screen == "battery"
screens.next_screen()
assert screens.current_screen == "temps"
screens.previous_screen()
assert screens.current_screen == "battery"
class FakeUart:
def __init__(self):
self.writes = []
self.read_chunks = []
def write(self, data):
self.writes.append(data)
def any(self):
return len(self.read_chunks) > 0
def read(self):
if self.read_chunks:
return self.read_chunks.pop(0)
return b""
def test_uart_client_send_message():
from comms.uart_client import UartClient
fake = FakeUart()
client = UartClient(fake)
client.send_message({"type": "status_request"})
assert fake.writes == [b'{"type":"status_request"}\n']
def test_uart_client_reads_newline_delimited_json():
from comms.uart_client import UartClient
fake = FakeUart()
fake.read_chunks = [
b'{"type":"status_response","timestamp":1}\n',
b'{"type":"relay_response","relay":"fridge","enabled":true}\n',
]
client = UartClient(fake)
messages = client.read_available_messages()
assert messages == [
{"type": "status_response", "timestamp": 1},
{"type": "relay_response", "relay": "fridge", "enabled": True},
]
def test_uart_client_handles_partial_messages():
from comms.uart_client import UartClient
fake = FakeUart()
client = UartClient(fake)
fake.read_chunks = [b'{"type":"status_', b'response"}\n']
messages = client.read_available_messages()
assert messages == [{"type": "status_response"}]
def test_uart_client_handles_invalid_json():
from comms.uart_client import UartClient
fake = FakeUart()
fake.read_chunks = [b'not json\n']
client = UartClient(fake)
messages = client.read_available_messages()
assert messages[0]["type"] == "error"
assert messages[0]["message"] == "invalid_json"
def test_communication_service_requests_status():
from comms.uart_client import UartClient
from comms.communication_service import CommunicationService
fake = FakeUart()
state = AppState()
service = CommunicationService(UartClient(fake), state)
service.request_status()
assert fake.writes == [b'{"type":"status_request"}\n']
def test_communication_service_sends_relay_command():
from comms.uart_client import UartClient
from comms.communication_service import CommunicationService
fake = FakeUart()
state = AppState()
service = CommunicationService(UartClient(fake), state)
service.set_relay("fridge", False)
assert fake.writes == [b'{"type":"set_relay","relay":"fridge","enabled":false}\n']
def test_communication_service_updates_state_from_status():
from comms.uart_client import UartClient
from comms.communication_service import CommunicationService
fake = FakeUart()
fake.read_chunks = [
b'{"type":"status_response","timestamp":99,"battery":{"soc":75},"network":{"uart_connected":true}}\n'
]
state = AppState()
service = CommunicationService(UartClient(fake), state)
messages = service.poll()
assert messages[0]["type"] == "status_response"
assert state.last_status_timestamp == 99
assert state.battery["soc"] == 75
assert state.network["uart_connected"] is True
def test_communication_service_updates_state_from_relay_response():
from comms.uart_client import UartClient
from comms.communication_service import CommunicationService
fake = FakeUart()
fake.read_chunks = [
b'{"type":"relay_response","relay":"starlink","enabled":true,"ok":true}\n'
]
state = AppState()
service = CommunicationService(UartClient(fake), state)
service.poll()
assert state.relays["starlink"] is True
def test_communication_service_records_errors():
from comms.uart_client import UartClient
from comms.communication_service import CommunicationService
fake = FakeUart()
fake.read_chunks = [b'{"type":"error","message":"invalid_json"}\n']
state = AppState()
service = CommunicationService(UartClient(fake), state)
service.poll()
assert state.last_error["message"] == "invalid_json"