Add HTTP fallback mode to Pico communication service

This commit is contained in:
2026-06-03 02:43:42 -06:00
parent c3b786073a
commit ae719be590
2 changed files with 80 additions and 1 deletions
+21 -1
View File
@@ -8,16 +8,30 @@ from .protocol import (
class CommunicationService:
def __init__(self, uart_client, app_state):
def __init__(self, uart_client, app_state, http_client=None):
self.uart_client = uart_client
self.http_client = http_client
self.app_state = app_state
self.last_messages = []
self.use_http_fallback = False
def request_status(self):
if self.use_http_fallback and self.http_client:
message = self.http_client.get_status()
self.handle_message(message)
return message
self.uart_client.send_message(make_status_request())
return None
def set_relay(self, relay, enabled):
if self.use_http_fallback and self.http_client:
message = self.http_client.set_relay(relay, enabled)
self.handle_message(message)
return message
self.uart_client.send_message(make_set_relay(relay, enabled))
return None
def poll(self):
messages = self.uart_client.read_available_messages()
@@ -28,6 +42,12 @@ class CommunicationService:
return messages
def enable_http_fallback(self):
self.use_http_fallback = True
def disable_http_fallback(self):
self.use_http_fallback = False
def handle_message(self, message):
if is_status_response(message):
self.app_state.update_from_status(message)
+59
View File
@@ -291,3 +291,62 @@ def test_http_client_set_relay_off():
assert fake_requests.urls == ["http://192.168.4.1/relay/fridge/off"]
assert payload == {"ok": True}
def test_communication_service_http_fallback_status():
from comms.uart_client import UartClient
from comms.http_client import HttpClient
from comms.communication_service import CommunicationService
fake_uart = FakeUart()
fake_requests = FakeRequests()
fake_requests.responses = [
{"type": "status_response", "battery": {"soc": 66}}
]
state = AppState()
service = CommunicationService(
UartClient(fake_uart),
state,
HttpClient(fake_requests),
)
service.enable_http_fallback()
response = service.request_status()
assert response["type"] == "status_response"
assert state.battery["soc"] == 66
assert fake_uart.writes == []
assert fake_requests.urls == ["http://192.168.4.1/status"]
def test_communication_service_http_fallback_relay():
from comms.uart_client import UartClient
from comms.http_client import HttpClient
from comms.communication_service import CommunicationService
fake_uart = FakeUart()
fake_requests = FakeRequests()
fake_requests.responses = [
{
"type": "relay_response",
"relay": "fridge",
"enabled": True,
"ok": True,
}
]
state = AppState()
service = CommunicationService(
UartClient(fake_uart),
state,
HttpClient(fake_requests),
)
service.enable_http_fallback()
response = service.set_relay("fridge", True)
assert response["type"] == "relay_response"
assert state.relays["fridge"] is True
assert fake_uart.writes == []
assert fake_requests.urls == ["http://192.168.4.1/relay/fridge/on"]