import sys from pathlib import Path ROOT = Path(__file__).resolve().parents[1] PICO = ROOT / "pico-dashboard" sys.path.insert(0, str(PICO)) from comms.protocol import make_status_request, make_set_relay from state.app_state import AppState from alarms.alarm_manager import AlarmManager from ui.screen_manager import ScreenManager def test_protocol_messages(): assert make_status_request() == {"type": "status_request"} assert make_set_relay("starlink", True) == { "type": "set_relay", "relay": "starlink", "enabled": True, } def test_app_state_status_update(): state = AppState() state.update_from_status({ "timestamp": 123, "battery": {"soc": 82, "voltage": 13.2}, "temps": {"fridge_zone_1": 34.5}, "sensor_health": {"fridge_zone_1": True}, "relays": {"starlink": True}, "vehicle": {"ignition_on": False}, "network": {"uart_connected": True}, }) assert state.last_status_timestamp == 123 assert state.battery["soc"] == 82 assert state.temps["fridge_zone_1"] == 34.5 assert state.sensor_health["fridge_zone_1"] is True assert state.relays["starlink"] is True assert state.network["uart_connected"] is True def test_alarm_manager(): state = AppState() state.update_from_status({ "battery": {"soc": 10, "voltage": 11.8}, "temps": {"fridge_zone_1": 45.0, "fridge_zone_2": 35.0}, "sensor_health": { "fridge_zone_1": True, "fridge_zone_2": True, "rear_seat": False, "outside": True, }, "network": {"uart_connected": False}, }) alarms = AlarmManager().evaluate(state) assert "battery_soc_low" in alarms assert "battery_voltage_low" in alarms assert "fridge_zone_1_high" in alarms assert "communication_lost" in alarms assert "sensor_failure:rear_seat" in alarms def test_screen_manager(): screens = ScreenManager() assert screens.current_screen == "dashboard" screens.go_to("battery") assert screens.current_screen == "battery" screens.next_screen() assert screens.current_screen == "temps" screens.previous_screen() assert screens.current_screen == "battery" class FakeUart: def __init__(self): self.writes = [] self.read_chunks = [] def write(self, data): self.writes.append(data) def any(self): return len(self.read_chunks) > 0 def read(self): if self.read_chunks: return self.read_chunks.pop(0) return b"" def test_uart_client_send_message(): from comms.uart_client import UartClient fake = FakeUart() client = UartClient(fake) client.send_message({"type": "status_request"}) assert fake.writes == [b'{"type":"status_request"}\n'] def test_uart_client_reads_newline_delimited_json(): from comms.uart_client import UartClient fake = FakeUart() fake.read_chunks = [ b'{"type":"status_response","timestamp":1}\n', b'{"type":"relay_response","relay":"fridge","enabled":true}\n', ] client = UartClient(fake) messages = client.read_available_messages() assert messages == [ {"type": "status_response", "timestamp": 1}, {"type": "relay_response", "relay": "fridge", "enabled": True}, ] def test_uart_client_handles_partial_messages(): from comms.uart_client import UartClient fake = FakeUart() client = UartClient(fake) fake.read_chunks = [b'{"type":"status_', b'response"}\n'] messages = client.read_available_messages() assert messages == [{"type": "status_response"}] def test_uart_client_handles_invalid_json(): from comms.uart_client import UartClient fake = FakeUart() fake.read_chunks = [b'not json\n'] client = UartClient(fake) messages = client.read_available_messages() assert messages[0]["type"] == "error" assert messages[0]["message"] == "invalid_json" def test_communication_service_requests_status(): from comms.uart_client import UartClient from comms.communication_service import CommunicationService fake = FakeUart() state = AppState() service = CommunicationService(UartClient(fake), state) service.request_status() assert fake.writes == [b'{"type":"status_request"}\n'] def test_communication_service_sends_relay_command(): from comms.uart_client import UartClient from comms.communication_service import CommunicationService fake = FakeUart() state = AppState() service = CommunicationService(UartClient(fake), state) service.set_relay("fridge", False) assert fake.writes == [b'{"type":"set_relay","relay":"fridge","enabled":false}\n'] def test_communication_service_updates_state_from_status(): from comms.uart_client import UartClient from comms.communication_service import CommunicationService fake = FakeUart() fake.read_chunks = [ b'{"type":"status_response","timestamp":99,"battery":{"soc":75},"network":{"uart_connected":true}}\n' ] state = AppState() service = CommunicationService(UartClient(fake), state) messages = service.poll() assert messages[0]["type"] == "status_response" assert state.last_status_timestamp == 99 assert state.battery["soc"] == 75 assert state.network["uart_connected"] is True def test_communication_service_updates_state_from_relay_response(): from comms.uart_client import UartClient from comms.communication_service import CommunicationService fake = FakeUart() fake.read_chunks = [ b'{"type":"relay_response","relay":"starlink","enabled":true,"ok":true}\n' ] state = AppState() service = CommunicationService(UartClient(fake), state) service.poll() assert state.relays["starlink"] is True def test_communication_service_records_errors(): from comms.uart_client import UartClient from comms.communication_service import CommunicationService fake = FakeUart() fake.read_chunks = [b'{"type":"error","message":"invalid_json"}\n'] state = AppState() service = CommunicationService(UartClient(fake), state) service.poll() assert state.last_error["message"] == "invalid_json" class FakeResponse: def __init__(self, payload): self.payload = payload def json(self): return self.payload class FakeRequests: def __init__(self): self.urls = [] self.responses = [] def get(self, url): self.urls.append(url) if self.responses: return FakeResponse(self.responses.pop(0)) return FakeResponse({"ok": True}) def test_http_client_get_status(): from comms.http_client import HttpClient fake_requests = FakeRequests() fake_requests.responses = [ {"type": "status_response", "battery": {"soc": 82}} ] client = HttpClient(fake_requests) payload = client.get_status() assert fake_requests.urls == ["http://192.168.4.1/status"] assert payload["type"] == "status_response" assert payload["battery"]["soc"] == 82 def test_http_client_set_relay_on(): from comms.http_client import HttpClient fake_requests = FakeRequests() client = HttpClient(fake_requests) payload = client.set_relay("starlink", True) assert fake_requests.urls == ["http://192.168.4.1/relay/starlink/on"] assert payload == {"ok": True} def test_http_client_set_relay_off(): from comms.http_client import HttpClient fake_requests = FakeRequests() client = HttpClient(fake_requests) payload = client.set_relay("fridge", False) assert fake_requests.urls == ["http://192.168.4.1/relay/fridge/off"] assert payload == {"ok": True} def test_communication_service_http_fallback_status(): from comms.uart_client import UartClient from comms.http_client import HttpClient from comms.communication_service import CommunicationService fake_uart = FakeUart() fake_requests = FakeRequests() fake_requests.responses = [ {"type": "status_response", "battery": {"soc": 66}} ] state = AppState() service = CommunicationService( UartClient(fake_uart), state, HttpClient(fake_requests), ) service.enable_http_fallback() response = service.request_status() assert response["type"] == "status_response" assert state.battery["soc"] == 66 assert fake_uart.writes == [] assert fake_requests.urls == ["http://192.168.4.1/status"] def test_communication_service_http_fallback_relay(): from comms.uart_client import UartClient from comms.http_client import HttpClient from comms.communication_service import CommunicationService fake_uart = FakeUart() fake_requests = FakeRequests() fake_requests.responses = [ { "type": "relay_response", "relay": "fridge", "enabled": True, "ok": True, } ] state = AppState() service = CommunicationService( UartClient(fake_uart), state, HttpClient(fake_requests), ) service.enable_http_fallback() response = service.set_relay("fridge", True) assert response["type"] == "relay_response" assert state.relays["fridge"] is True assert fake_uart.writes == [] assert fake_requests.urls == ["http://192.168.4.1/relay/fridge/on"] def test_communication_service_marks_uart_connected_on_status(): from comms.uart_client import UartClient from comms.communication_service import CommunicationService fake = FakeUart() fake.read_chunks = [ b'{"type":"status_response","timestamp":1,"network":{"uart_connected":true}}\n' ] state = AppState() current_time = [100.0] service = CommunicationService( UartClient(fake), state, clock=lambda: current_time[0], timeout_seconds=5, ) service.poll() assert state.network["uart_connected"] is True assert service.last_status_received_at == 100.0 def test_communication_service_marks_uart_disconnected_after_timeout(): from comms.uart_client import UartClient from comms.communication_service import CommunicationService fake = FakeUart() fake.read_chunks = [ b'{"type":"status_response","timestamp":1,"network":{"uart_connected":true}}\n' ] state = AppState() current_time = [100.0] service = CommunicationService( UartClient(fake), state, clock=lambda: current_time[0], timeout_seconds=5, ) service.poll() assert state.network["uart_connected"] is True current_time[0] = 106.0 service.poll() assert state.network["uart_connected"] is False def test_communication_service_auto_selects_http_fallback_when_uart_down(): from comms.uart_client import UartClient from comms.http_client import HttpClient from comms.communication_service import CommunicationService fake_uart = FakeUart() fake_requests = FakeRequests() state = AppState() service = CommunicationService( UartClient(fake_uart), state, HttpClient(fake_requests), clock=lambda: 100.0, timeout_seconds=5, ) service.poll() assert service.auto_select_transport() is True assert service.use_http_fallback is True def test_dashboard_view_model_formats_dashboard_summary(): from ui.dashboard_view_model import DashboardViewModel state = AppState() state.update_from_status({ "battery": { "soc": 82, "voltage": 13.2, "current": -6.4, "runtime_hours": 12.0, }, "temps": { "fridge_zone_1": 34.5, "fridge_zone_2": 36.0, }, "relays": { "starlink": False, "fridge": True, }, "network": { "uart_connected": True, }, }) vm = DashboardViewModel(state, alarms=[]).as_dict() assert vm["top_bar"]["soc"] == "82%" assert vm["top_bar"]["comms"] == "UART OK" assert vm["top_bar"]["alarms"] == 0 assert vm["battery"]["voltage"] == "13.2V" assert vm["battery"]["current"] == "-6.4A" assert vm["battery"]["runtime"] == "12.0 hr" assert vm["fridge"]["zone_1"] == "34.5°F" assert vm["fridge"]["zone_2"] == "36.0°F" assert vm["power"]["starlink"] == "OFF" assert vm["power"]["fridge"] == "ON" def test_dashboard_view_model_handles_missing_values(): from ui.dashboard_view_model import DashboardViewModel state = AppState() vm = DashboardViewModel(state, alarms=["communication_lost"]).as_dict() assert vm["top_bar"]["soc"] == "--" assert vm["top_bar"]["comms"] == "UART LOST" assert vm["top_bar"]["alarms"] == 1 assert vm["battery"]["voltage"] == "--" assert vm["fridge"]["zone_1"] == "--" def test_battery_view_model(): from ui.detail_view_models import BatteryViewModel state = AppState() state.update_from_status({ "battery": { "soc": 82, "voltage": 13.2, "current": -6.4, "remaining_ah": 82.0, "runtime_hours": 12.0, "temperature_f": 76.0, } }) vm = BatteryViewModel(state).as_dict() assert vm["soc"] == "82%" assert vm["voltage"] == "13.2V" assert vm["current"] == "-6.4A" assert vm["remaining_ah"] == "82.0Ah" assert vm["runtime_hours"] == "12.0 hr" assert vm["temperature_f"] == "76.0°F" def test_temps_view_model(): from ui.detail_view_models import TempsViewModel state = AppState() state.update_from_status({ "temps": { "fridge_zone_1": 34.5, "fridge_zone_2": 36.0, "rear_seat": 71.2, "outside": None, }, "sensor_health": { "fridge_zone_1": True, "fridge_zone_2": True, "rear_seat": True, "outside": False, } }) rows = TempsViewModel(state).as_list() assert rows[0]["label"] == "Fridge Zone 1" assert rows[0]["temperature"] == "34.5°F" assert rows[0]["status"] == "OK" assert rows[3]["label"] == "Outside Air" assert rows[3]["temperature"] == "--" assert rows[3]["status"] == "FAULT" def test_power_view_model(): from ui.detail_view_models import PowerViewModel state = AppState() state.update_from_status({ "relays": { "starlink": False, "fridge": True, } }) vm = PowerViewModel(state).as_dict() assert vm["starlink"] == "OFF" assert vm["fridge"] == "ON" def test_system_view_model(): from ui.detail_view_models import SystemViewModel state = AppState() state.update_from_status({ "sensor_health": { "fridge_zone_1": True, "fridge_zone_2": True, "rear_seat": False, "outside": True, }, "vehicle": { "ignition_on": False, }, "network": { "uart_connected": True, "wifi_enabled": True, } }) vm = SystemViewModel(state).as_dict() assert vm["esp32"] == "Online" assert vm["uart"] == "Connected" assert vm["wifi_api"] == "Available" assert vm["sensors"] == "3 / 4 OK" assert vm["ignition"] == "Off" def test_alarm_view_model_no_alarms(): from ui.alarm_view_model import AlarmViewModel vm = AlarmViewModel([]) assert vm.count() == 0 assert vm.has_alarms() is False assert vm.has_critical() is False assert vm.should_buzz() is False assert vm.primary_alarm() is None assert vm.as_list() == [] def test_alarm_view_model_critical_alarm(): from ui.alarm_view_model import AlarmViewModel vm = AlarmViewModel(["fridge_zone_1_high"]) assert vm.count() == 1 assert vm.has_alarms() is True assert vm.has_critical() is True assert vm.should_buzz() is True primary = vm.primary_alarm() assert primary["severity"] == "critical" assert primary["title"] == "Fridge Zone 1 High Temp" def test_alarm_view_model_sensor_failure(): from ui.alarm_view_model import AlarmViewModel vm = AlarmViewModel(["sensor_failure:outside"]) assert vm.has_alarms() is True assert vm.has_critical() is False assert vm.should_buzz() is False alarm = vm.primary_alarm() assert alarm["severity"] == "warning" assert alarm["title"] == "Sensor Failure" assert alarm["detail"] == "Outside" class FakePin: def __init__(self): self.values = [] def value(self, value): self.values.append(value) def test_buzzer_on_off(): from hardware.buzzer import Buzzer pin = FakePin() buzzer = Buzzer(pin) buzzer.on() assert buzzer.enabled is True assert pin.values[-1] == 1 buzzer.off() assert buzzer.enabled is False assert pin.values[-1] == 0 def test_buzzer_updates_from_alarm_view(): from hardware.buzzer import Buzzer from ui.alarm_view_model import AlarmViewModel buzzer = Buzzer() buzzer.update_from_alarm_view(AlarmViewModel(["fridge_zone_1_high"])) assert buzzer.enabled is True assert buzzer.pattern == "critical" buzzer.update_from_alarm_view(AlarmViewModel([])) assert buzzer.enabled is False assert buzzer.pattern is None def test_touch_controller_reads_event(): from hardware.touch import TouchController class FakeTouchDevice: def read(self): return {"x": 100, "y": 200, "pressed": True} controller = TouchController(FakeTouchDevice()) event = controller.read_event() assert event.x == 100 assert event.y == 200 assert event.pressed is True def test_touch_router_bottom_nav_changes_screen(): from hardware.touch import TouchEvent from ui.touch_router import TouchRouter screens = ScreenManager() router = TouchRouter(screens) handled = router.handle_touch(TouchEvent(200, 460, True)) assert handled is True assert screens.current_screen == "power" def test_touch_router_ignores_non_nav_touch(): from hardware.touch import TouchEvent from ui.touch_router import TouchRouter screens = ScreenManager() router = TouchRouter(screens) handled = router.handle_touch(TouchEvent(200, 200, True)) assert handled is False assert screens.current_screen == "dashboard" def test_touch_router_ignores_unpressed_event(): from hardware.touch import TouchEvent from ui.touch_router import TouchRouter screens = ScreenManager() router = TouchRouter(screens) handled = router.handle_touch(TouchEvent(200, 460, False)) assert handled is False assert screens.current_screen == "dashboard" def test_display_records_commands(): from hardware.display import Display display = Display() display.clear() display.text(1, 2, "hello", size=2) display.rect(3, 4, 5, 6, filled=True) assert display.commands == [ ("clear",), ("text", 1, 2, "hello", 2), ("rect", 3, 4, 5, 6, True), ] def test_dashboard_renderer_creates_draw_commands(): from hardware.display import Display from ui.dashboard_view_model import DashboardViewModel from ui.renderers import DashboardRenderer state = AppState() state.update_from_status({ "battery": { "soc": 82, "voltage": 13.2, "current": -6.4, "runtime_hours": 12.0, }, "temps": { "fridge_zone_1": 34.5, "fridge_zone_2": 36.0, }, "relays": { "starlink": False, "fridge": True, }, "network": { "uart_connected": True, }, }) display = Display() renderer = DashboardRenderer(display) renderer.render(DashboardViewModel(state)) assert display.commands[0] == ("clear",) assert ("text", 0, 0, "SOC 82%", 1) in display.commands assert ("text", 120, 0, "UART OK", 1) in display.commands assert ("text", 0, 48, "Battery", 1) in display.commands assert ("text", 0, 128, "Fridge", 1) in display.commands assert ("text", 0, 232, "Power", 1) in display.commands assert ("text", 0, 440, "[Dash]", 1) in display.commands def test_pico_dashboard_app_tick_updates_alarms_and_buzzer(): from app import PicoDashboardApp from comms.uart_client import UartClient from comms.communication_service import CommunicationService from hardware.display import Display from hardware.buzzer import Buzzer from ui.renderers import DashboardRenderer from ui.touch_router import TouchRouter fake_uart = FakeUart() fake_uart.read_chunks = [ b'{"type":"status_response","battery":{"soc":10,"voltage":11.8},"temps":{"fridge_zone_1":45.0},"sensor_health":{"fridge_zone_1":true,"fridge_zone_2":true,"rear_seat":true,"outside":true},"network":{"uart_connected":true}}\n' ] state = AppState() comms = CommunicationService(UartClient(fake_uart), state) screens = ScreenManager() display = Display() buzzer = Buzzer() app = PicoDashboardApp( app_state=state, communication_service=comms, screen_manager=screens, touch_router=TouchRouter(screens), display=display, dashboard_renderer=DashboardRenderer(display), buzzer=buzzer, ) app.tick() assert "battery_soc_low" in app.alarms assert "battery_voltage_low" in app.alarms assert "fridge_zone_1_high" in app.alarms assert buzzer.enabled is True assert display.commands def test_pico_main_builds_app(): import main app = main.build_app() assert app.state is not None assert app.comms is not None assert app.screen_manager.current_screen == "dashboard" def test_pico_main_runs_one_tick(): import main app = main.main() assert app is not None