From 9189cc7343f5c41b455a0ae64bf70c8d87a34183 Mon Sep 17 00:00:00 2001 From: root Date: Wed, 3 Jun 2026 02:39:30 -0600 Subject: [PATCH] Add Pico UART client and tests --- pico-dashboard/comms/uart_client.py | 40 ++++++++++++++++ tests/test_pico_core.py | 72 +++++++++++++++++++++++++++++ 2 files changed, 112 insertions(+) diff --git a/pico-dashboard/comms/uart_client.py b/pico-dashboard/comms/uart_client.py index e69de29..901bd61 100644 --- a/pico-dashboard/comms/uart_client.py +++ b/pico-dashboard/comms/uart_client.py @@ -0,0 +1,40 @@ +import json + + +class UartClient: + def __init__(self, uart): + self.uart = uart + self.buffer = "" + + def send_message(self, message): + payload = json.dumps(message, separators=(",", ":")) + "\n" + self.uart.write(payload.encode("utf-8")) + + def read_available_messages(self): + messages = [] + + while self.uart.any(): + chunk = self.uart.read() + + if not chunk: + break + + self.buffer += chunk.decode("utf-8") + + while "\n" in self.buffer: + line, self.buffer = self.buffer.split("\n", 1) + line = line.strip() + + if not line: + continue + + try: + messages.append(json.loads(line)) + except ValueError: + messages.append({ + "type": "error", + "message": "invalid_json", + "raw": line, + }) + + return messages diff --git a/tests/test_pico_core.py b/tests/test_pico_core.py index bdd1c89..1b121a4 100644 --- a/tests/test_pico_core.py +++ b/tests/test_pico_core.py @@ -78,3 +78,75 @@ def test_screen_manager(): screens.previous_screen() assert screens.current_screen == "battery" + + +class FakeUart: + def __init__(self): + self.writes = [] + self.read_chunks = [] + + def write(self, data): + self.writes.append(data) + + def any(self): + return len(self.read_chunks) > 0 + + def read(self): + if self.read_chunks: + return self.read_chunks.pop(0) + return b"" + + +def test_uart_client_send_message(): + from comms.uart_client import UartClient + + fake = FakeUart() + client = UartClient(fake) + + client.send_message({"type": "status_request"}) + + assert fake.writes == [b'{"type":"status_request"}\n'] + + +def test_uart_client_reads_newline_delimited_json(): + from comms.uart_client import UartClient + + fake = FakeUart() + fake.read_chunks = [ + b'{"type":"status_response","timestamp":1}\n', + b'{"type":"relay_response","relay":"fridge","enabled":true}\n', + ] + + client = UartClient(fake) + messages = client.read_available_messages() + + assert messages == [ + {"type": "status_response", "timestamp": 1}, + {"type": "relay_response", "relay": "fridge", "enabled": True}, + ] + + +def test_uart_client_handles_partial_messages(): + from comms.uart_client import UartClient + + fake = FakeUart() + client = UartClient(fake) + + fake.read_chunks = [b'{"type":"status_', b'response"}\n'] + + messages = client.read_available_messages() + + assert messages == [{"type": "status_response"}] + + +def test_uart_client_handles_invalid_json(): + from comms.uart_client import UartClient + + fake = FakeUart() + fake.read_chunks = [b'not json\n'] + + client = UartClient(fake) + messages = client.read_available_messages() + + assert messages[0]["type"] == "error" + assert messages[0]["message"] == "invalid_json"