820 lines
21 KiB
Python
820 lines
21 KiB
Python
import sys
|
|
from pathlib import Path
|
|
|
|
ROOT = Path(__file__).resolve().parents[1]
|
|
PICO = ROOT / "pico-dashboard"
|
|
sys.path.insert(0, str(PICO))
|
|
|
|
from comms.protocol import make_status_request, make_set_relay
|
|
from state.app_state import AppState
|
|
from alarms.alarm_manager import AlarmManager
|
|
from ui.screen_manager import ScreenManager
|
|
|
|
|
|
def test_protocol_messages():
|
|
assert make_status_request() == {"type": "status_request"}
|
|
|
|
assert make_set_relay("starlink", True) == {
|
|
"type": "set_relay",
|
|
"relay": "starlink",
|
|
"enabled": True,
|
|
}
|
|
|
|
|
|
def test_app_state_status_update():
|
|
state = AppState()
|
|
|
|
state.update_from_status({
|
|
"timestamp": 123,
|
|
"battery": {"soc": 82, "voltage": 13.2},
|
|
"temps": {"fridge_zone_1": 34.5},
|
|
"sensor_health": {"fridge_zone_1": True},
|
|
"relays": {"starlink": True},
|
|
"vehicle": {"ignition_on": False},
|
|
"network": {"uart_connected": True},
|
|
})
|
|
|
|
assert state.last_status_timestamp == 123
|
|
assert state.battery["soc"] == 82
|
|
assert state.temps["fridge_zone_1"] == 34.5
|
|
assert state.sensor_health["fridge_zone_1"] is True
|
|
assert state.relays["starlink"] is True
|
|
assert state.network["uart_connected"] is True
|
|
|
|
|
|
def test_alarm_manager():
|
|
state = AppState()
|
|
state.update_from_status({
|
|
"battery": {"soc": 10, "voltage": 11.8},
|
|
"temps": {"fridge_zone_1": 45.0, "fridge_zone_2": 35.0},
|
|
"sensor_health": {
|
|
"fridge_zone_1": True,
|
|
"fridge_zone_2": True,
|
|
"rear_seat": False,
|
|
"outside": True,
|
|
},
|
|
"network": {"uart_connected": False},
|
|
})
|
|
|
|
alarms = AlarmManager().evaluate(state)
|
|
|
|
assert "battery_soc_low" in alarms
|
|
assert "battery_voltage_low" in alarms
|
|
assert "fridge_zone_1_high" in alarms
|
|
assert "communication_lost" in alarms
|
|
assert "sensor_failure:rear_seat" in alarms
|
|
|
|
|
|
def test_screen_manager():
|
|
screens = ScreenManager()
|
|
|
|
assert screens.current_screen == "dashboard"
|
|
|
|
screens.go_to("battery")
|
|
assert screens.current_screen == "battery"
|
|
|
|
screens.next_screen()
|
|
assert screens.current_screen == "temps"
|
|
|
|
screens.previous_screen()
|
|
assert screens.current_screen == "battery"
|
|
|
|
|
|
class FakeUart:
|
|
def __init__(self):
|
|
self.writes = []
|
|
self.read_chunks = []
|
|
|
|
def write(self, data):
|
|
self.writes.append(data)
|
|
|
|
def any(self):
|
|
return len(self.read_chunks) > 0
|
|
|
|
def read(self):
|
|
if self.read_chunks:
|
|
return self.read_chunks.pop(0)
|
|
return b""
|
|
|
|
|
|
def test_uart_client_send_message():
|
|
from comms.uart_client import UartClient
|
|
|
|
fake = FakeUart()
|
|
client = UartClient(fake)
|
|
|
|
client.send_message({"type": "status_request"})
|
|
|
|
assert fake.writes == [b'{"type":"status_request"}\n']
|
|
|
|
|
|
def test_uart_client_reads_newline_delimited_json():
|
|
from comms.uart_client import UartClient
|
|
|
|
fake = FakeUart()
|
|
fake.read_chunks = [
|
|
b'{"type":"status_response","timestamp":1}\n',
|
|
b'{"type":"relay_response","relay":"fridge","enabled":true}\n',
|
|
]
|
|
|
|
client = UartClient(fake)
|
|
messages = client.read_available_messages()
|
|
|
|
assert messages == [
|
|
{"type": "status_response", "timestamp": 1},
|
|
{"type": "relay_response", "relay": "fridge", "enabled": True},
|
|
]
|
|
|
|
|
|
def test_uart_client_handles_partial_messages():
|
|
from comms.uart_client import UartClient
|
|
|
|
fake = FakeUart()
|
|
client = UartClient(fake)
|
|
|
|
fake.read_chunks = [b'{"type":"status_', b'response"}\n']
|
|
|
|
messages = client.read_available_messages()
|
|
|
|
assert messages == [{"type": "status_response"}]
|
|
|
|
|
|
def test_uart_client_handles_invalid_json():
|
|
from comms.uart_client import UartClient
|
|
|
|
fake = FakeUart()
|
|
fake.read_chunks = [b'not json\n']
|
|
|
|
client = UartClient(fake)
|
|
messages = client.read_available_messages()
|
|
|
|
assert messages[0]["type"] == "error"
|
|
assert messages[0]["message"] == "invalid_json"
|
|
|
|
|
|
def test_communication_service_requests_status():
|
|
from comms.uart_client import UartClient
|
|
from comms.communication_service import CommunicationService
|
|
|
|
fake = FakeUart()
|
|
state = AppState()
|
|
service = CommunicationService(UartClient(fake), state)
|
|
|
|
service.request_status()
|
|
|
|
assert fake.writes == [b'{"type":"status_request"}\n']
|
|
|
|
|
|
def test_communication_service_sends_relay_command():
|
|
from comms.uart_client import UartClient
|
|
from comms.communication_service import CommunicationService
|
|
|
|
fake = FakeUart()
|
|
state = AppState()
|
|
service = CommunicationService(UartClient(fake), state)
|
|
|
|
service.set_relay("fridge", False)
|
|
|
|
assert fake.writes == [b'{"type":"set_relay","relay":"fridge","enabled":false}\n']
|
|
|
|
|
|
def test_communication_service_updates_state_from_status():
|
|
from comms.uart_client import UartClient
|
|
from comms.communication_service import CommunicationService
|
|
|
|
fake = FakeUart()
|
|
fake.read_chunks = [
|
|
b'{"type":"status_response","timestamp":99,"battery":{"soc":75},"network":{"uart_connected":true}}\n'
|
|
]
|
|
|
|
state = AppState()
|
|
service = CommunicationService(UartClient(fake), state)
|
|
|
|
messages = service.poll()
|
|
|
|
assert messages[0]["type"] == "status_response"
|
|
assert state.last_status_timestamp == 99
|
|
assert state.battery["soc"] == 75
|
|
assert state.network["uart_connected"] is True
|
|
|
|
|
|
def test_communication_service_updates_state_from_relay_response():
|
|
from comms.uart_client import UartClient
|
|
from comms.communication_service import CommunicationService
|
|
|
|
fake = FakeUart()
|
|
fake.read_chunks = [
|
|
b'{"type":"relay_response","relay":"starlink","enabled":true,"ok":true}\n'
|
|
]
|
|
|
|
state = AppState()
|
|
service = CommunicationService(UartClient(fake), state)
|
|
|
|
service.poll()
|
|
|
|
assert state.relays["starlink"] is True
|
|
|
|
|
|
def test_communication_service_records_errors():
|
|
from comms.uart_client import UartClient
|
|
from comms.communication_service import CommunicationService
|
|
|
|
fake = FakeUart()
|
|
fake.read_chunks = [b'{"type":"error","message":"invalid_json"}\n']
|
|
|
|
state = AppState()
|
|
service = CommunicationService(UartClient(fake), state)
|
|
|
|
service.poll()
|
|
|
|
assert state.last_error["message"] == "invalid_json"
|
|
|
|
|
|
class FakeResponse:
|
|
def __init__(self, payload):
|
|
self.payload = payload
|
|
|
|
def json(self):
|
|
return self.payload
|
|
|
|
|
|
class FakeRequests:
|
|
def __init__(self):
|
|
self.urls = []
|
|
self.responses = []
|
|
|
|
def get(self, url):
|
|
self.urls.append(url)
|
|
|
|
if self.responses:
|
|
return FakeResponse(self.responses.pop(0))
|
|
|
|
return FakeResponse({"ok": True})
|
|
|
|
|
|
def test_http_client_get_status():
|
|
from comms.http_client import HttpClient
|
|
|
|
fake_requests = FakeRequests()
|
|
fake_requests.responses = [
|
|
{"type": "status_response", "battery": {"soc": 82}}
|
|
]
|
|
|
|
client = HttpClient(fake_requests)
|
|
|
|
payload = client.get_status()
|
|
|
|
assert fake_requests.urls == ["http://192.168.4.1/status"]
|
|
assert payload["type"] == "status_response"
|
|
assert payload["battery"]["soc"] == 82
|
|
|
|
|
|
def test_http_client_set_relay_on():
|
|
from comms.http_client import HttpClient
|
|
|
|
fake_requests = FakeRequests()
|
|
client = HttpClient(fake_requests)
|
|
|
|
payload = client.set_relay("starlink", True)
|
|
|
|
assert fake_requests.urls == ["http://192.168.4.1/relay/starlink/on"]
|
|
assert payload == {"ok": True}
|
|
|
|
|
|
def test_http_client_set_relay_off():
|
|
from comms.http_client import HttpClient
|
|
|
|
fake_requests = FakeRequests()
|
|
client = HttpClient(fake_requests)
|
|
|
|
payload = client.set_relay("fridge", False)
|
|
|
|
assert fake_requests.urls == ["http://192.168.4.1/relay/fridge/off"]
|
|
assert payload == {"ok": True}
|
|
|
|
|
|
def test_communication_service_http_fallback_status():
|
|
from comms.uart_client import UartClient
|
|
from comms.http_client import HttpClient
|
|
from comms.communication_service import CommunicationService
|
|
|
|
fake_uart = FakeUart()
|
|
fake_requests = FakeRequests()
|
|
fake_requests.responses = [
|
|
{"type": "status_response", "battery": {"soc": 66}}
|
|
]
|
|
|
|
state = AppState()
|
|
service = CommunicationService(
|
|
UartClient(fake_uart),
|
|
state,
|
|
HttpClient(fake_requests),
|
|
)
|
|
|
|
service.enable_http_fallback()
|
|
response = service.request_status()
|
|
|
|
assert response["type"] == "status_response"
|
|
assert state.battery["soc"] == 66
|
|
assert fake_uart.writes == []
|
|
assert fake_requests.urls == ["http://192.168.4.1/status"]
|
|
|
|
|
|
def test_communication_service_http_fallback_relay():
|
|
from comms.uart_client import UartClient
|
|
from comms.http_client import HttpClient
|
|
from comms.communication_service import CommunicationService
|
|
|
|
fake_uart = FakeUart()
|
|
fake_requests = FakeRequests()
|
|
fake_requests.responses = [
|
|
{
|
|
"type": "relay_response",
|
|
"relay": "fridge",
|
|
"enabled": True,
|
|
"ok": True,
|
|
}
|
|
]
|
|
|
|
state = AppState()
|
|
service = CommunicationService(
|
|
UartClient(fake_uart),
|
|
state,
|
|
HttpClient(fake_requests),
|
|
)
|
|
|
|
service.enable_http_fallback()
|
|
response = service.set_relay("fridge", True)
|
|
|
|
assert response["type"] == "relay_response"
|
|
assert state.relays["fridge"] is True
|
|
assert fake_uart.writes == []
|
|
assert fake_requests.urls == ["http://192.168.4.1/relay/fridge/on"]
|
|
|
|
|
|
def test_communication_service_marks_uart_connected_on_status():
|
|
from comms.uart_client import UartClient
|
|
from comms.communication_service import CommunicationService
|
|
|
|
fake = FakeUart()
|
|
fake.read_chunks = [
|
|
b'{"type":"status_response","timestamp":1,"network":{"uart_connected":true}}\n'
|
|
]
|
|
|
|
state = AppState()
|
|
current_time = [100.0]
|
|
|
|
service = CommunicationService(
|
|
UartClient(fake),
|
|
state,
|
|
clock=lambda: current_time[0],
|
|
timeout_seconds=5,
|
|
)
|
|
|
|
service.poll()
|
|
|
|
assert state.network["uart_connected"] is True
|
|
assert service.last_status_received_at == 100.0
|
|
|
|
|
|
def test_communication_service_marks_uart_disconnected_after_timeout():
|
|
from comms.uart_client import UartClient
|
|
from comms.communication_service import CommunicationService
|
|
|
|
fake = FakeUart()
|
|
fake.read_chunks = [
|
|
b'{"type":"status_response","timestamp":1,"network":{"uart_connected":true}}\n'
|
|
]
|
|
|
|
state = AppState()
|
|
current_time = [100.0]
|
|
|
|
service = CommunicationService(
|
|
UartClient(fake),
|
|
state,
|
|
clock=lambda: current_time[0],
|
|
timeout_seconds=5,
|
|
)
|
|
|
|
service.poll()
|
|
assert state.network["uart_connected"] is True
|
|
|
|
current_time[0] = 106.0
|
|
service.poll()
|
|
|
|
assert state.network["uart_connected"] is False
|
|
|
|
|
|
def test_communication_service_auto_selects_http_fallback_when_uart_down():
|
|
from comms.uart_client import UartClient
|
|
from comms.http_client import HttpClient
|
|
from comms.communication_service import CommunicationService
|
|
|
|
fake_uart = FakeUart()
|
|
fake_requests = FakeRequests()
|
|
|
|
state = AppState()
|
|
service = CommunicationService(
|
|
UartClient(fake_uart),
|
|
state,
|
|
HttpClient(fake_requests),
|
|
clock=lambda: 100.0,
|
|
timeout_seconds=5,
|
|
)
|
|
|
|
service.poll()
|
|
|
|
assert service.auto_select_transport() is True
|
|
assert service.use_http_fallback is True
|
|
|
|
|
|
def test_dashboard_view_model_formats_dashboard_summary():
|
|
from ui.dashboard_view_model import DashboardViewModel
|
|
|
|
state = AppState()
|
|
state.update_from_status({
|
|
"battery": {
|
|
"soc": 82,
|
|
"voltage": 13.2,
|
|
"current": -6.4,
|
|
"runtime_hours": 12.0,
|
|
},
|
|
"temps": {
|
|
"fridge_zone_1": 34.5,
|
|
"fridge_zone_2": 36.0,
|
|
},
|
|
"relays": {
|
|
"starlink": False,
|
|
"fridge": True,
|
|
},
|
|
"network": {
|
|
"uart_connected": True,
|
|
},
|
|
})
|
|
|
|
vm = DashboardViewModel(state, alarms=[]).as_dict()
|
|
|
|
assert vm["top_bar"]["soc"] == "82%"
|
|
assert vm["top_bar"]["comms"] == "UART OK"
|
|
assert vm["top_bar"]["alarms"] == 0
|
|
|
|
assert vm["battery"]["voltage"] == "13.2V"
|
|
assert vm["battery"]["current"] == "-6.4A"
|
|
assert vm["battery"]["runtime"] == "12.0 hr"
|
|
|
|
assert vm["fridge"]["zone_1"] == "34.5°F"
|
|
assert vm["fridge"]["zone_2"] == "36.0°F"
|
|
|
|
assert vm["power"]["starlink"] == "OFF"
|
|
assert vm["power"]["fridge"] == "ON"
|
|
|
|
|
|
def test_dashboard_view_model_handles_missing_values():
|
|
from ui.dashboard_view_model import DashboardViewModel
|
|
|
|
state = AppState()
|
|
|
|
vm = DashboardViewModel(state, alarms=["communication_lost"]).as_dict()
|
|
|
|
assert vm["top_bar"]["soc"] == "--"
|
|
assert vm["top_bar"]["comms"] == "UART LOST"
|
|
assert vm["top_bar"]["alarms"] == 1
|
|
|
|
assert vm["battery"]["voltage"] == "--"
|
|
assert vm["fridge"]["zone_1"] == "--"
|
|
|
|
|
|
def test_battery_view_model():
|
|
from ui.detail_view_models import BatteryViewModel
|
|
|
|
state = AppState()
|
|
state.update_from_status({
|
|
"battery": {
|
|
"soc": 82,
|
|
"voltage": 13.2,
|
|
"current": -6.4,
|
|
"remaining_ah": 82.0,
|
|
"runtime_hours": 12.0,
|
|
"temperature_f": 76.0,
|
|
}
|
|
})
|
|
|
|
vm = BatteryViewModel(state).as_dict()
|
|
|
|
assert vm["soc"] == "82%"
|
|
assert vm["voltage"] == "13.2V"
|
|
assert vm["current"] == "-6.4A"
|
|
assert vm["remaining_ah"] == "82.0Ah"
|
|
assert vm["runtime_hours"] == "12.0 hr"
|
|
assert vm["temperature_f"] == "76.0°F"
|
|
|
|
|
|
def test_temps_view_model():
|
|
from ui.detail_view_models import TempsViewModel
|
|
|
|
state = AppState()
|
|
state.update_from_status({
|
|
"temps": {
|
|
"fridge_zone_1": 34.5,
|
|
"fridge_zone_2": 36.0,
|
|
"rear_seat": 71.2,
|
|
"outside": None,
|
|
},
|
|
"sensor_health": {
|
|
"fridge_zone_1": True,
|
|
"fridge_zone_2": True,
|
|
"rear_seat": True,
|
|
"outside": False,
|
|
}
|
|
})
|
|
|
|
rows = TempsViewModel(state).as_list()
|
|
|
|
assert rows[0]["label"] == "Fridge Zone 1"
|
|
assert rows[0]["temperature"] == "34.5°F"
|
|
assert rows[0]["status"] == "OK"
|
|
|
|
assert rows[3]["label"] == "Outside Air"
|
|
assert rows[3]["temperature"] == "--"
|
|
assert rows[3]["status"] == "FAULT"
|
|
|
|
|
|
def test_power_view_model():
|
|
from ui.detail_view_models import PowerViewModel
|
|
|
|
state = AppState()
|
|
state.update_from_status({
|
|
"relays": {
|
|
"starlink": False,
|
|
"fridge": True,
|
|
}
|
|
})
|
|
|
|
vm = PowerViewModel(state).as_dict()
|
|
|
|
assert vm["starlink"] == "OFF"
|
|
assert vm["fridge"] == "ON"
|
|
|
|
|
|
def test_system_view_model():
|
|
from ui.detail_view_models import SystemViewModel
|
|
|
|
state = AppState()
|
|
state.update_from_status({
|
|
"sensor_health": {
|
|
"fridge_zone_1": True,
|
|
"fridge_zone_2": True,
|
|
"rear_seat": False,
|
|
"outside": True,
|
|
},
|
|
"vehicle": {
|
|
"ignition_on": False,
|
|
},
|
|
"network": {
|
|
"uart_connected": True,
|
|
"wifi_enabled": True,
|
|
}
|
|
})
|
|
|
|
vm = SystemViewModel(state).as_dict()
|
|
|
|
assert vm["esp32"] == "Online"
|
|
assert vm["uart"] == "Connected"
|
|
assert vm["wifi_api"] == "Available"
|
|
assert vm["sensors"] == "3 / 4 OK"
|
|
assert vm["ignition"] == "Off"
|
|
|
|
|
|
def test_alarm_view_model_no_alarms():
|
|
from ui.alarm_view_model import AlarmViewModel
|
|
|
|
vm = AlarmViewModel([])
|
|
|
|
assert vm.count() == 0
|
|
assert vm.has_alarms() is False
|
|
assert vm.has_critical() is False
|
|
assert vm.should_buzz() is False
|
|
assert vm.primary_alarm() is None
|
|
assert vm.as_list() == []
|
|
|
|
|
|
def test_alarm_view_model_critical_alarm():
|
|
from ui.alarm_view_model import AlarmViewModel
|
|
|
|
vm = AlarmViewModel(["fridge_zone_1_high"])
|
|
|
|
assert vm.count() == 1
|
|
assert vm.has_alarms() is True
|
|
assert vm.has_critical() is True
|
|
assert vm.should_buzz() is True
|
|
|
|
primary = vm.primary_alarm()
|
|
|
|
assert primary["severity"] == "critical"
|
|
assert primary["title"] == "Fridge Zone 1 High Temp"
|
|
|
|
|
|
def test_alarm_view_model_sensor_failure():
|
|
from ui.alarm_view_model import AlarmViewModel
|
|
|
|
vm = AlarmViewModel(["sensor_failure:outside"])
|
|
|
|
assert vm.has_alarms() is True
|
|
assert vm.has_critical() is False
|
|
assert vm.should_buzz() is False
|
|
|
|
alarm = vm.primary_alarm()
|
|
|
|
assert alarm["severity"] == "warning"
|
|
assert alarm["title"] == "Sensor Failure"
|
|
assert alarm["detail"] == "Outside"
|
|
|
|
|
|
class FakePin:
|
|
def __init__(self):
|
|
self.values = []
|
|
|
|
def value(self, value):
|
|
self.values.append(value)
|
|
|
|
|
|
def test_buzzer_on_off():
|
|
from hardware.buzzer import Buzzer
|
|
|
|
pin = FakePin()
|
|
buzzer = Buzzer(pin)
|
|
|
|
buzzer.on()
|
|
assert buzzer.enabled is True
|
|
assert pin.values[-1] == 1
|
|
|
|
buzzer.off()
|
|
assert buzzer.enabled is False
|
|
assert pin.values[-1] == 0
|
|
|
|
|
|
def test_buzzer_updates_from_alarm_view():
|
|
from hardware.buzzer import Buzzer
|
|
from ui.alarm_view_model import AlarmViewModel
|
|
|
|
buzzer = Buzzer()
|
|
|
|
buzzer.update_from_alarm_view(AlarmViewModel(["fridge_zone_1_high"]))
|
|
|
|
assert buzzer.enabled is True
|
|
assert buzzer.pattern == "critical"
|
|
|
|
buzzer.update_from_alarm_view(AlarmViewModel([]))
|
|
|
|
assert buzzer.enabled is False
|
|
assert buzzer.pattern is None
|
|
|
|
|
|
def test_touch_controller_reads_event():
|
|
from hardware.touch import TouchController
|
|
|
|
class FakeTouchDevice:
|
|
def read(self):
|
|
return {"x": 100, "y": 200, "pressed": True}
|
|
|
|
controller = TouchController(FakeTouchDevice())
|
|
event = controller.read_event()
|
|
|
|
assert event.x == 100
|
|
assert event.y == 200
|
|
assert event.pressed is True
|
|
|
|
|
|
def test_touch_router_bottom_nav_changes_screen():
|
|
from hardware.touch import TouchEvent
|
|
from ui.touch_router import TouchRouter
|
|
|
|
screens = ScreenManager()
|
|
router = TouchRouter(screens)
|
|
|
|
handled = router.handle_touch(TouchEvent(200, 460, True))
|
|
|
|
assert handled is True
|
|
assert screens.current_screen == "power"
|
|
|
|
|
|
def test_touch_router_ignores_non_nav_touch():
|
|
from hardware.touch import TouchEvent
|
|
from ui.touch_router import TouchRouter
|
|
|
|
screens = ScreenManager()
|
|
router = TouchRouter(screens)
|
|
|
|
handled = router.handle_touch(TouchEvent(200, 200, True))
|
|
|
|
assert handled is False
|
|
assert screens.current_screen == "dashboard"
|
|
|
|
|
|
def test_touch_router_ignores_unpressed_event():
|
|
from hardware.touch import TouchEvent
|
|
from ui.touch_router import TouchRouter
|
|
|
|
screens = ScreenManager()
|
|
router = TouchRouter(screens)
|
|
|
|
handled = router.handle_touch(TouchEvent(200, 460, False))
|
|
|
|
assert handled is False
|
|
assert screens.current_screen == "dashboard"
|
|
|
|
|
|
def test_display_records_commands():
|
|
from hardware.display import Display
|
|
|
|
display = Display()
|
|
|
|
display.clear()
|
|
display.text(1, 2, "hello", size=2)
|
|
display.rect(3, 4, 5, 6, filled=True)
|
|
|
|
assert display.commands == [
|
|
("clear",),
|
|
("text", 1, 2, "hello", 2),
|
|
("rect", 3, 4, 5, 6, True),
|
|
]
|
|
|
|
|
|
def test_dashboard_renderer_creates_draw_commands():
|
|
from hardware.display import Display
|
|
from ui.dashboard_view_model import DashboardViewModel
|
|
from ui.renderers import DashboardRenderer
|
|
|
|
state = AppState()
|
|
state.update_from_status({
|
|
"battery": {
|
|
"soc": 82,
|
|
"voltage": 13.2,
|
|
"current": -6.4,
|
|
"runtime_hours": 12.0,
|
|
},
|
|
"temps": {
|
|
"fridge_zone_1": 34.5,
|
|
"fridge_zone_2": 36.0,
|
|
},
|
|
"relays": {
|
|
"starlink": False,
|
|
"fridge": True,
|
|
},
|
|
"network": {
|
|
"uart_connected": True,
|
|
},
|
|
})
|
|
|
|
display = Display()
|
|
renderer = DashboardRenderer(display)
|
|
|
|
renderer.render(DashboardViewModel(state))
|
|
|
|
assert display.commands[0] == ("clear",)
|
|
assert ("text", 0, 0, "SOC 82%", 1) in display.commands
|
|
assert ("text", 120, 0, "UART OK", 1) in display.commands
|
|
assert ("text", 0, 48, "Battery", 1) in display.commands
|
|
assert ("text", 0, 128, "Fridge", 1) in display.commands
|
|
assert ("text", 0, 232, "Power", 1) in display.commands
|
|
assert ("text", 0, 440, "[Dash]", 1) in display.commands
|
|
|
|
|
|
def test_pico_dashboard_app_tick_updates_alarms_and_buzzer():
|
|
from app import PicoDashboardApp
|
|
from comms.uart_client import UartClient
|
|
from comms.communication_service import CommunicationService
|
|
from hardware.display import Display
|
|
from hardware.buzzer import Buzzer
|
|
from ui.renderers import DashboardRenderer
|
|
from ui.touch_router import TouchRouter
|
|
|
|
fake_uart = FakeUart()
|
|
fake_uart.read_chunks = [
|
|
b'{"type":"status_response","battery":{"soc":10,"voltage":11.8},"temps":{"fridge_zone_1":45.0},"sensor_health":{"fridge_zone_1":true,"fridge_zone_2":true,"rear_seat":true,"outside":true},"network":{"uart_connected":true}}\n'
|
|
]
|
|
|
|
state = AppState()
|
|
comms = CommunicationService(UartClient(fake_uart), state)
|
|
screens = ScreenManager()
|
|
display = Display()
|
|
buzzer = Buzzer()
|
|
|
|
app = PicoDashboardApp(
|
|
app_state=state,
|
|
communication_service=comms,
|
|
screen_manager=screens,
|
|
touch_router=TouchRouter(screens),
|
|
display=display,
|
|
dashboard_renderer=DashboardRenderer(display),
|
|
buzzer=buzzer,
|
|
)
|
|
|
|
app.tick()
|
|
|
|
assert "battery_soc_low" in app.alarms
|
|
assert "battery_voltage_low" in app.alarms
|
|
assert "fridge_zone_1_high" in app.alarms
|
|
assert buzzer.enabled is True
|
|
assert display.commands
|