Add Pico communication timeout tracking

This commit is contained in:
2026-06-03 02:44:30 -06:00
parent ae719be590
commit f3965e017e
2 changed files with 119 additions and 1 deletions
+43 -1
View File
@@ -8,12 +8,32 @@ from .protocol import (
class CommunicationService: class CommunicationService:
def __init__(self, uart_client, app_state, http_client=None): def __init__(
self,
uart_client,
app_state,
http_client=None,
clock=None,
timeout_seconds=5,
):
self.uart_client = uart_client self.uart_client = uart_client
self.http_client = http_client self.http_client = http_client
self.app_state = app_state self.app_state = app_state
self.clock = clock
self.timeout_seconds = timeout_seconds
self.last_messages = [] self.last_messages = []
self.use_http_fallback = False self.use_http_fallback = False
self.last_status_received_at = None
def now(self):
if self.clock:
return self.clock()
try:
import time
return time.time()
except ImportError:
return 0
def request_status(self): def request_status(self):
if self.use_http_fallback and self.http_client: if self.use_http_fallback and self.http_client:
@@ -40,8 +60,28 @@ class CommunicationService:
for message in messages: for message in messages:
self.handle_message(message) self.handle_message(message)
self.update_connection_state()
return messages return messages
def update_connection_state(self):
if self.last_status_received_at is None:
self.app_state.network["uart_connected"] = False
return
age = self.now() - self.last_status_received_at
self.app_state.network["uart_connected"] = age <= self.timeout_seconds
def should_use_http_fallback(self):
return (
self.http_client is not None
and not self.app_state.network.get("uart_connected", False)
)
def auto_select_transport(self):
self.use_http_fallback = self.should_use_http_fallback()
return self.use_http_fallback
def enable_http_fallback(self): def enable_http_fallback(self):
self.use_http_fallback = True self.use_http_fallback = True
@@ -50,7 +90,9 @@ class CommunicationService:
def handle_message(self, message): def handle_message(self, message):
if is_status_response(message): if is_status_response(message):
self.last_status_received_at = self.now()
self.app_state.update_from_status(message) self.app_state.update_from_status(message)
self.app_state.network["uart_connected"] = True
return return
if is_relay_response(message): if is_relay_response(message):
+76
View File
@@ -350,3 +350,79 @@ def test_communication_service_http_fallback_relay():
assert state.relays["fridge"] is True assert state.relays["fridge"] is True
assert fake_uart.writes == [] assert fake_uart.writes == []
assert fake_requests.urls == ["http://192.168.4.1/relay/fridge/on"] assert fake_requests.urls == ["http://192.168.4.1/relay/fridge/on"]
def test_communication_service_marks_uart_connected_on_status():
from comms.uart_client import UartClient
from comms.communication_service import CommunicationService
fake = FakeUart()
fake.read_chunks = [
b'{"type":"status_response","timestamp":1,"network":{"uart_connected":true}}\n'
]
state = AppState()
current_time = [100.0]
service = CommunicationService(
UartClient(fake),
state,
clock=lambda: current_time[0],
timeout_seconds=5,
)
service.poll()
assert state.network["uart_connected"] is True
assert service.last_status_received_at == 100.0
def test_communication_service_marks_uart_disconnected_after_timeout():
from comms.uart_client import UartClient
from comms.communication_service import CommunicationService
fake = FakeUart()
fake.read_chunks = [
b'{"type":"status_response","timestamp":1,"network":{"uart_connected":true}}\n'
]
state = AppState()
current_time = [100.0]
service = CommunicationService(
UartClient(fake),
state,
clock=lambda: current_time[0],
timeout_seconds=5,
)
service.poll()
assert state.network["uart_connected"] is True
current_time[0] = 106.0
service.poll()
assert state.network["uart_connected"] is False
def test_communication_service_auto_selects_http_fallback_when_uart_down():
from comms.uart_client import UartClient
from comms.http_client import HttpClient
from comms.communication_service import CommunicationService
fake_uart = FakeUart()
fake_requests = FakeRequests()
state = AppState()
service = CommunicationService(
UartClient(fake_uart),
state,
HttpClient(fake_requests),
clock=lambda: 100.0,
timeout_seconds=5,
)
service.poll()
assert service.auto_select_transport() is True
assert service.use_http_fallback is True