From f3965e017e9a3e1369d1f026e9d13ae6723dfe4d Mon Sep 17 00:00:00 2001 From: nick Date: Wed, 3 Jun 2026 02:44:30 -0600 Subject: [PATCH] Add Pico communication timeout tracking --- pico-dashboard/comms/communication_service.py | 44 ++++++++++- tests/test_pico_core.py | 76 +++++++++++++++++++ 2 files changed, 119 insertions(+), 1 deletion(-) diff --git a/pico-dashboard/comms/communication_service.py b/pico-dashboard/comms/communication_service.py index 2c66997..9a0699b 100644 --- a/pico-dashboard/comms/communication_service.py +++ b/pico-dashboard/comms/communication_service.py @@ -8,12 +8,32 @@ from .protocol import ( class CommunicationService: - def __init__(self, uart_client, app_state, http_client=None): + def __init__( + self, + uart_client, + app_state, + http_client=None, + clock=None, + timeout_seconds=5, + ): self.uart_client = uart_client self.http_client = http_client self.app_state = app_state + self.clock = clock + self.timeout_seconds = timeout_seconds self.last_messages = [] self.use_http_fallback = False + self.last_status_received_at = None + + def now(self): + if self.clock: + return self.clock() + + try: + import time + return time.time() + except ImportError: + return 0 def request_status(self): if self.use_http_fallback and self.http_client: @@ -40,8 +60,28 @@ class CommunicationService: for message in messages: self.handle_message(message) + self.update_connection_state() + return messages + def update_connection_state(self): + if self.last_status_received_at is None: + self.app_state.network["uart_connected"] = False + return + + age = self.now() - self.last_status_received_at + self.app_state.network["uart_connected"] = age <= self.timeout_seconds + + def should_use_http_fallback(self): + return ( + self.http_client is not None + and not self.app_state.network.get("uart_connected", False) + ) + + def auto_select_transport(self): + self.use_http_fallback = self.should_use_http_fallback() + return self.use_http_fallback + def enable_http_fallback(self): self.use_http_fallback = True @@ -50,7 +90,9 @@ class CommunicationService: def handle_message(self, message): if is_status_response(message): + self.last_status_received_at = self.now() self.app_state.update_from_status(message) + self.app_state.network["uart_connected"] = True return if is_relay_response(message): diff --git a/tests/test_pico_core.py b/tests/test_pico_core.py index bcdf7cf..1a375b4 100644 --- a/tests/test_pico_core.py +++ b/tests/test_pico_core.py @@ -350,3 +350,79 @@ def test_communication_service_http_fallback_relay(): assert state.relays["fridge"] is True assert fake_uart.writes == [] assert fake_requests.urls == ["http://192.168.4.1/relay/fridge/on"] + + +def test_communication_service_marks_uart_connected_on_status(): + from comms.uart_client import UartClient + from comms.communication_service import CommunicationService + + fake = FakeUart() + fake.read_chunks = [ + b'{"type":"status_response","timestamp":1,"network":{"uart_connected":true}}\n' + ] + + state = AppState() + current_time = [100.0] + + service = CommunicationService( + UartClient(fake), + state, + clock=lambda: current_time[0], + timeout_seconds=5, + ) + + service.poll() + + assert state.network["uart_connected"] is True + assert service.last_status_received_at == 100.0 + + +def test_communication_service_marks_uart_disconnected_after_timeout(): + from comms.uart_client import UartClient + from comms.communication_service import CommunicationService + + fake = FakeUart() + fake.read_chunks = [ + b'{"type":"status_response","timestamp":1,"network":{"uart_connected":true}}\n' + ] + + state = AppState() + current_time = [100.0] + + service = CommunicationService( + UartClient(fake), + state, + clock=lambda: current_time[0], + timeout_seconds=5, + ) + + service.poll() + assert state.network["uart_connected"] is True + + current_time[0] = 106.0 + service.poll() + + assert state.network["uart_connected"] is False + + +def test_communication_service_auto_selects_http_fallback_when_uart_down(): + from comms.uart_client import UartClient + from comms.http_client import HttpClient + from comms.communication_service import CommunicationService + + fake_uart = FakeUart() + fake_requests = FakeRequests() + + state = AppState() + service = CommunicationService( + UartClient(fake_uart), + state, + HttpClient(fake_requests), + clock=lambda: 100.0, + timeout_seconds=5, + ) + + service.poll() + + assert service.auto_select_transport() is True + assert service.use_http_fallback is True