Files
xterra-overland-dashboard/tests/test_pico_core.py
T

781 lines
20 KiB
Python

import sys
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
PICO = ROOT / "pico-dashboard"
sys.path.insert(0, str(PICO))
from comms.protocol import make_status_request, make_set_relay
from state.app_state import AppState
from alarms.alarm_manager import AlarmManager
from ui.screen_manager import ScreenManager
def test_protocol_messages():
assert make_status_request() == {"type": "status_request"}
assert make_set_relay("starlink", True) == {
"type": "set_relay",
"relay": "starlink",
"enabled": True,
}
def test_app_state_status_update():
state = AppState()
state.update_from_status({
"timestamp": 123,
"battery": {"soc": 82, "voltage": 13.2},
"temps": {"fridge_zone_1": 34.5},
"sensor_health": {"fridge_zone_1": True},
"relays": {"starlink": True},
"vehicle": {"ignition_on": False},
"network": {"uart_connected": True},
})
assert state.last_status_timestamp == 123
assert state.battery["soc"] == 82
assert state.temps["fridge_zone_1"] == 34.5
assert state.sensor_health["fridge_zone_1"] is True
assert state.relays["starlink"] is True
assert state.network["uart_connected"] is True
def test_alarm_manager():
state = AppState()
state.update_from_status({
"battery": {"soc": 10, "voltage": 11.8},
"temps": {"fridge_zone_1": 45.0, "fridge_zone_2": 35.0},
"sensor_health": {
"fridge_zone_1": True,
"fridge_zone_2": True,
"rear_seat": False,
"outside": True,
},
"network": {"uart_connected": False},
})
alarms = AlarmManager().evaluate(state)
assert "battery_soc_low" in alarms
assert "battery_voltage_low" in alarms
assert "fridge_zone_1_high" in alarms
assert "communication_lost" in alarms
assert "sensor_failure:rear_seat" in alarms
def test_screen_manager():
screens = ScreenManager()
assert screens.current_screen == "dashboard"
screens.go_to("battery")
assert screens.current_screen == "battery"
screens.next_screen()
assert screens.current_screen == "temps"
screens.previous_screen()
assert screens.current_screen == "battery"
class FakeUart:
def __init__(self):
self.writes = []
self.read_chunks = []
def write(self, data):
self.writes.append(data)
def any(self):
return len(self.read_chunks) > 0
def read(self):
if self.read_chunks:
return self.read_chunks.pop(0)
return b""
def test_uart_client_send_message():
from comms.uart_client import UartClient
fake = FakeUart()
client = UartClient(fake)
client.send_message({"type": "status_request"})
assert fake.writes == [b'{"type":"status_request"}\n']
def test_uart_client_reads_newline_delimited_json():
from comms.uart_client import UartClient
fake = FakeUart()
fake.read_chunks = [
b'{"type":"status_response","timestamp":1}\n',
b'{"type":"relay_response","relay":"fridge","enabled":true}\n',
]
client = UartClient(fake)
messages = client.read_available_messages()
assert messages == [
{"type": "status_response", "timestamp": 1},
{"type": "relay_response", "relay": "fridge", "enabled": True},
]
def test_uart_client_handles_partial_messages():
from comms.uart_client import UartClient
fake = FakeUart()
client = UartClient(fake)
fake.read_chunks = [b'{"type":"status_', b'response"}\n']
messages = client.read_available_messages()
assert messages == [{"type": "status_response"}]
def test_uart_client_handles_invalid_json():
from comms.uart_client import UartClient
fake = FakeUart()
fake.read_chunks = [b'not json\n']
client = UartClient(fake)
messages = client.read_available_messages()
assert messages[0]["type"] == "error"
assert messages[0]["message"] == "invalid_json"
def test_communication_service_requests_status():
from comms.uart_client import UartClient
from comms.communication_service import CommunicationService
fake = FakeUart()
state = AppState()
service = CommunicationService(UartClient(fake), state)
service.request_status()
assert fake.writes == [b'{"type":"status_request"}\n']
def test_communication_service_sends_relay_command():
from comms.uart_client import UartClient
from comms.communication_service import CommunicationService
fake = FakeUart()
state = AppState()
service = CommunicationService(UartClient(fake), state)
service.set_relay("fridge", False)
assert fake.writes == [b'{"type":"set_relay","relay":"fridge","enabled":false}\n']
def test_communication_service_updates_state_from_status():
from comms.uart_client import UartClient
from comms.communication_service import CommunicationService
fake = FakeUart()
fake.read_chunks = [
b'{"type":"status_response","timestamp":99,"battery":{"soc":75},"network":{"uart_connected":true}}\n'
]
state = AppState()
service = CommunicationService(UartClient(fake), state)
messages = service.poll()
assert messages[0]["type"] == "status_response"
assert state.last_status_timestamp == 99
assert state.battery["soc"] == 75
assert state.network["uart_connected"] is True
def test_communication_service_updates_state_from_relay_response():
from comms.uart_client import UartClient
from comms.communication_service import CommunicationService
fake = FakeUart()
fake.read_chunks = [
b'{"type":"relay_response","relay":"starlink","enabled":true,"ok":true}\n'
]
state = AppState()
service = CommunicationService(UartClient(fake), state)
service.poll()
assert state.relays["starlink"] is True
def test_communication_service_records_errors():
from comms.uart_client import UartClient
from comms.communication_service import CommunicationService
fake = FakeUart()
fake.read_chunks = [b'{"type":"error","message":"invalid_json"}\n']
state = AppState()
service = CommunicationService(UartClient(fake), state)
service.poll()
assert state.last_error["message"] == "invalid_json"
class FakeResponse:
def __init__(self, payload):
self.payload = payload
def json(self):
return self.payload
class FakeRequests:
def __init__(self):
self.urls = []
self.responses = []
def get(self, url):
self.urls.append(url)
if self.responses:
return FakeResponse(self.responses.pop(0))
return FakeResponse({"ok": True})
def test_http_client_get_status():
from comms.http_client import HttpClient
fake_requests = FakeRequests()
fake_requests.responses = [
{"type": "status_response", "battery": {"soc": 82}}
]
client = HttpClient(fake_requests)
payload = client.get_status()
assert fake_requests.urls == ["http://192.168.4.1/status"]
assert payload["type"] == "status_response"
assert payload["battery"]["soc"] == 82
def test_http_client_set_relay_on():
from comms.http_client import HttpClient
fake_requests = FakeRequests()
client = HttpClient(fake_requests)
payload = client.set_relay("starlink", True)
assert fake_requests.urls == ["http://192.168.4.1/relay/starlink/on"]
assert payload == {"ok": True}
def test_http_client_set_relay_off():
from comms.http_client import HttpClient
fake_requests = FakeRequests()
client = HttpClient(fake_requests)
payload = client.set_relay("fridge", False)
assert fake_requests.urls == ["http://192.168.4.1/relay/fridge/off"]
assert payload == {"ok": True}
def test_communication_service_http_fallback_status():
from comms.uart_client import UartClient
from comms.http_client import HttpClient
from comms.communication_service import CommunicationService
fake_uart = FakeUart()
fake_requests = FakeRequests()
fake_requests.responses = [
{"type": "status_response", "battery": {"soc": 66}}
]
state = AppState()
service = CommunicationService(
UartClient(fake_uart),
state,
HttpClient(fake_requests),
)
service.enable_http_fallback()
response = service.request_status()
assert response["type"] == "status_response"
assert state.battery["soc"] == 66
assert fake_uart.writes == []
assert fake_requests.urls == ["http://192.168.4.1/status"]
def test_communication_service_http_fallback_relay():
from comms.uart_client import UartClient
from comms.http_client import HttpClient
from comms.communication_service import CommunicationService
fake_uart = FakeUart()
fake_requests = FakeRequests()
fake_requests.responses = [
{
"type": "relay_response",
"relay": "fridge",
"enabled": True,
"ok": True,
}
]
state = AppState()
service = CommunicationService(
UartClient(fake_uart),
state,
HttpClient(fake_requests),
)
service.enable_http_fallback()
response = service.set_relay("fridge", True)
assert response["type"] == "relay_response"
assert state.relays["fridge"] is True
assert fake_uart.writes == []
assert fake_requests.urls == ["http://192.168.4.1/relay/fridge/on"]
def test_communication_service_marks_uart_connected_on_status():
from comms.uart_client import UartClient
from comms.communication_service import CommunicationService
fake = FakeUart()
fake.read_chunks = [
b'{"type":"status_response","timestamp":1,"network":{"uart_connected":true}}\n'
]
state = AppState()
current_time = [100.0]
service = CommunicationService(
UartClient(fake),
state,
clock=lambda: current_time[0],
timeout_seconds=5,
)
service.poll()
assert state.network["uart_connected"] is True
assert service.last_status_received_at == 100.0
def test_communication_service_marks_uart_disconnected_after_timeout():
from comms.uart_client import UartClient
from comms.communication_service import CommunicationService
fake = FakeUart()
fake.read_chunks = [
b'{"type":"status_response","timestamp":1,"network":{"uart_connected":true}}\n'
]
state = AppState()
current_time = [100.0]
service = CommunicationService(
UartClient(fake),
state,
clock=lambda: current_time[0],
timeout_seconds=5,
)
service.poll()
assert state.network["uart_connected"] is True
current_time[0] = 106.0
service.poll()
assert state.network["uart_connected"] is False
def test_communication_service_auto_selects_http_fallback_when_uart_down():
from comms.uart_client import UartClient
from comms.http_client import HttpClient
from comms.communication_service import CommunicationService
fake_uart = FakeUart()
fake_requests = FakeRequests()
state = AppState()
service = CommunicationService(
UartClient(fake_uart),
state,
HttpClient(fake_requests),
clock=lambda: 100.0,
timeout_seconds=5,
)
service.poll()
assert service.auto_select_transport() is True
assert service.use_http_fallback is True
def test_dashboard_view_model_formats_dashboard_summary():
from ui.dashboard_view_model import DashboardViewModel
state = AppState()
state.update_from_status({
"battery": {
"soc": 82,
"voltage": 13.2,
"current": -6.4,
"runtime_hours": 12.0,
},
"temps": {
"fridge_zone_1": 34.5,
"fridge_zone_2": 36.0,
},
"relays": {
"starlink": False,
"fridge": True,
},
"network": {
"uart_connected": True,
},
})
vm = DashboardViewModel(state, alarms=[]).as_dict()
assert vm["top_bar"]["soc"] == "82%"
assert vm["top_bar"]["comms"] == "UART OK"
assert vm["top_bar"]["alarms"] == 0
assert vm["battery"]["voltage"] == "13.2V"
assert vm["battery"]["current"] == "-6.4A"
assert vm["battery"]["runtime"] == "12.0 hr"
assert vm["fridge"]["zone_1"] == "34.5°F"
assert vm["fridge"]["zone_2"] == "36.0°F"
assert vm["power"]["starlink"] == "OFF"
assert vm["power"]["fridge"] == "ON"
def test_dashboard_view_model_handles_missing_values():
from ui.dashboard_view_model import DashboardViewModel
state = AppState()
vm = DashboardViewModel(state, alarms=["communication_lost"]).as_dict()
assert vm["top_bar"]["soc"] == "--"
assert vm["top_bar"]["comms"] == "UART LOST"
assert vm["top_bar"]["alarms"] == 1
assert vm["battery"]["voltage"] == "--"
assert vm["fridge"]["zone_1"] == "--"
def test_battery_view_model():
from ui.detail_view_models import BatteryViewModel
state = AppState()
state.update_from_status({
"battery": {
"soc": 82,
"voltage": 13.2,
"current": -6.4,
"remaining_ah": 82.0,
"runtime_hours": 12.0,
"temperature_f": 76.0,
}
})
vm = BatteryViewModel(state).as_dict()
assert vm["soc"] == "82%"
assert vm["voltage"] == "13.2V"
assert vm["current"] == "-6.4A"
assert vm["remaining_ah"] == "82.0Ah"
assert vm["runtime_hours"] == "12.0 hr"
assert vm["temperature_f"] == "76.0°F"
def test_temps_view_model():
from ui.detail_view_models import TempsViewModel
state = AppState()
state.update_from_status({
"temps": {
"fridge_zone_1": 34.5,
"fridge_zone_2": 36.0,
"rear_seat": 71.2,
"outside": None,
},
"sensor_health": {
"fridge_zone_1": True,
"fridge_zone_2": True,
"rear_seat": True,
"outside": False,
}
})
rows = TempsViewModel(state).as_list()
assert rows[0]["label"] == "Fridge Zone 1"
assert rows[0]["temperature"] == "34.5°F"
assert rows[0]["status"] == "OK"
assert rows[3]["label"] == "Outside Air"
assert rows[3]["temperature"] == "--"
assert rows[3]["status"] == "FAULT"
def test_power_view_model():
from ui.detail_view_models import PowerViewModel
state = AppState()
state.update_from_status({
"relays": {
"starlink": False,
"fridge": True,
}
})
vm = PowerViewModel(state).as_dict()
assert vm["starlink"] == "OFF"
assert vm["fridge"] == "ON"
def test_system_view_model():
from ui.detail_view_models import SystemViewModel
state = AppState()
state.update_from_status({
"sensor_health": {
"fridge_zone_1": True,
"fridge_zone_2": True,
"rear_seat": False,
"outside": True,
},
"vehicle": {
"ignition_on": False,
},
"network": {
"uart_connected": True,
"wifi_enabled": True,
}
})
vm = SystemViewModel(state).as_dict()
assert vm["esp32"] == "Online"
assert vm["uart"] == "Connected"
assert vm["wifi_api"] == "Available"
assert vm["sensors"] == "3 / 4 OK"
assert vm["ignition"] == "Off"
def test_alarm_view_model_no_alarms():
from ui.alarm_view_model import AlarmViewModel
vm = AlarmViewModel([])
assert vm.count() == 0
assert vm.has_alarms() is False
assert vm.has_critical() is False
assert vm.should_buzz() is False
assert vm.primary_alarm() is None
assert vm.as_list() == []
def test_alarm_view_model_critical_alarm():
from ui.alarm_view_model import AlarmViewModel
vm = AlarmViewModel(["fridge_zone_1_high"])
assert vm.count() == 1
assert vm.has_alarms() is True
assert vm.has_critical() is True
assert vm.should_buzz() is True
primary = vm.primary_alarm()
assert primary["severity"] == "critical"
assert primary["title"] == "Fridge Zone 1 High Temp"
def test_alarm_view_model_sensor_failure():
from ui.alarm_view_model import AlarmViewModel
vm = AlarmViewModel(["sensor_failure:outside"])
assert vm.has_alarms() is True
assert vm.has_critical() is False
assert vm.should_buzz() is False
alarm = vm.primary_alarm()
assert alarm["severity"] == "warning"
assert alarm["title"] == "Sensor Failure"
assert alarm["detail"] == "Outside"
class FakePin:
def __init__(self):
self.values = []
def value(self, value):
self.values.append(value)
def test_buzzer_on_off():
from hardware.buzzer import Buzzer
pin = FakePin()
buzzer = Buzzer(pin)
buzzer.on()
assert buzzer.enabled is True
assert pin.values[-1] == 1
buzzer.off()
assert buzzer.enabled is False
assert pin.values[-1] == 0
def test_buzzer_updates_from_alarm_view():
from hardware.buzzer import Buzzer
from ui.alarm_view_model import AlarmViewModel
buzzer = Buzzer()
buzzer.update_from_alarm_view(AlarmViewModel(["fridge_zone_1_high"]))
assert buzzer.enabled is True
assert buzzer.pattern == "critical"
buzzer.update_from_alarm_view(AlarmViewModel([]))
assert buzzer.enabled is False
assert buzzer.pattern is None
def test_touch_controller_reads_event():
from hardware.touch import TouchController
class FakeTouchDevice:
def read(self):
return {"x": 100, "y": 200, "pressed": True}
controller = TouchController(FakeTouchDevice())
event = controller.read_event()
assert event.x == 100
assert event.y == 200
assert event.pressed is True
def test_touch_router_bottom_nav_changes_screen():
from hardware.touch import TouchEvent
from ui.touch_router import TouchRouter
screens = ScreenManager()
router = TouchRouter(screens)
handled = router.handle_touch(TouchEvent(200, 460, True))
assert handled is True
assert screens.current_screen == "power"
def test_touch_router_ignores_non_nav_touch():
from hardware.touch import TouchEvent
from ui.touch_router import TouchRouter
screens = ScreenManager()
router = TouchRouter(screens)
handled = router.handle_touch(TouchEvent(200, 200, True))
assert handled is False
assert screens.current_screen == "dashboard"
def test_touch_router_ignores_unpressed_event():
from hardware.touch import TouchEvent
from ui.touch_router import TouchRouter
screens = ScreenManager()
router = TouchRouter(screens)
handled = router.handle_touch(TouchEvent(200, 460, False))
assert handled is False
assert screens.current_screen == "dashboard"
def test_display_records_commands():
from hardware.display import Display
display = Display()
display.clear()
display.text(1, 2, "hello", size=2)
display.rect(3, 4, 5, 6, filled=True)
assert display.commands == [
("clear",),
("text", 1, 2, "hello", 2),
("rect", 3, 4, 5, 6, True),
]
def test_dashboard_renderer_creates_draw_commands():
from hardware.display import Display
from ui.dashboard_view_model import DashboardViewModel
from ui.renderers import DashboardRenderer
state = AppState()
state.update_from_status({
"battery": {
"soc": 82,
"voltage": 13.2,
"current": -6.4,
"runtime_hours": 12.0,
},
"temps": {
"fridge_zone_1": 34.5,
"fridge_zone_2": 36.0,
},
"relays": {
"starlink": False,
"fridge": True,
},
"network": {
"uart_connected": True,
},
})
display = Display()
renderer = DashboardRenderer(display)
renderer.render(DashboardViewModel(state))
assert display.commands[0] == ("clear",)
assert ("text", 0, 0, "SOC 82%", 1) in display.commands
assert ("text", 120, 0, "UART OK", 1) in display.commands
assert ("text", 0, 48, "Battery", 1) in display.commands
assert ("text", 0, 128, "Fridge", 1) in display.commands
assert ("text", 0, 232, "Power", 1) in display.commands
assert ("text", 0, 440, "[Dash]", 1) in display.commands