Add Pico UART client and tests
This commit is contained in:
@@ -0,0 +1,40 @@
|
|||||||
|
import json
|
||||||
|
|
||||||
|
|
||||||
|
class UartClient:
|
||||||
|
def __init__(self, uart):
|
||||||
|
self.uart = uart
|
||||||
|
self.buffer = ""
|
||||||
|
|
||||||
|
def send_message(self, message):
|
||||||
|
payload = json.dumps(message, separators=(",", ":")) + "\n"
|
||||||
|
self.uart.write(payload.encode("utf-8"))
|
||||||
|
|
||||||
|
def read_available_messages(self):
|
||||||
|
messages = []
|
||||||
|
|
||||||
|
while self.uart.any():
|
||||||
|
chunk = self.uart.read()
|
||||||
|
|
||||||
|
if not chunk:
|
||||||
|
break
|
||||||
|
|
||||||
|
self.buffer += chunk.decode("utf-8")
|
||||||
|
|
||||||
|
while "\n" in self.buffer:
|
||||||
|
line, self.buffer = self.buffer.split("\n", 1)
|
||||||
|
line = line.strip()
|
||||||
|
|
||||||
|
if not line:
|
||||||
|
continue
|
||||||
|
|
||||||
|
try:
|
||||||
|
messages.append(json.loads(line))
|
||||||
|
except ValueError:
|
||||||
|
messages.append({
|
||||||
|
"type": "error",
|
||||||
|
"message": "invalid_json",
|
||||||
|
"raw": line,
|
||||||
|
})
|
||||||
|
|
||||||
|
return messages
|
||||||
|
|||||||
@@ -78,3 +78,75 @@ def test_screen_manager():
|
|||||||
|
|
||||||
screens.previous_screen()
|
screens.previous_screen()
|
||||||
assert screens.current_screen == "battery"
|
assert screens.current_screen == "battery"
|
||||||
|
|
||||||
|
|
||||||
|
class FakeUart:
|
||||||
|
def __init__(self):
|
||||||
|
self.writes = []
|
||||||
|
self.read_chunks = []
|
||||||
|
|
||||||
|
def write(self, data):
|
||||||
|
self.writes.append(data)
|
||||||
|
|
||||||
|
def any(self):
|
||||||
|
return len(self.read_chunks) > 0
|
||||||
|
|
||||||
|
def read(self):
|
||||||
|
if self.read_chunks:
|
||||||
|
return self.read_chunks.pop(0)
|
||||||
|
return b""
|
||||||
|
|
||||||
|
|
||||||
|
def test_uart_client_send_message():
|
||||||
|
from comms.uart_client import UartClient
|
||||||
|
|
||||||
|
fake = FakeUart()
|
||||||
|
client = UartClient(fake)
|
||||||
|
|
||||||
|
client.send_message({"type": "status_request"})
|
||||||
|
|
||||||
|
assert fake.writes == [b'{"type":"status_request"}\n']
|
||||||
|
|
||||||
|
|
||||||
|
def test_uart_client_reads_newline_delimited_json():
|
||||||
|
from comms.uart_client import UartClient
|
||||||
|
|
||||||
|
fake = FakeUart()
|
||||||
|
fake.read_chunks = [
|
||||||
|
b'{"type":"status_response","timestamp":1}\n',
|
||||||
|
b'{"type":"relay_response","relay":"fridge","enabled":true}\n',
|
||||||
|
]
|
||||||
|
|
||||||
|
client = UartClient(fake)
|
||||||
|
messages = client.read_available_messages()
|
||||||
|
|
||||||
|
assert messages == [
|
||||||
|
{"type": "status_response", "timestamp": 1},
|
||||||
|
{"type": "relay_response", "relay": "fridge", "enabled": True},
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def test_uart_client_handles_partial_messages():
|
||||||
|
from comms.uart_client import UartClient
|
||||||
|
|
||||||
|
fake = FakeUart()
|
||||||
|
client = UartClient(fake)
|
||||||
|
|
||||||
|
fake.read_chunks = [b'{"type":"status_', b'response"}\n']
|
||||||
|
|
||||||
|
messages = client.read_available_messages()
|
||||||
|
|
||||||
|
assert messages == [{"type": "status_response"}]
|
||||||
|
|
||||||
|
|
||||||
|
def test_uart_client_handles_invalid_json():
|
||||||
|
from comms.uart_client import UartClient
|
||||||
|
|
||||||
|
fake = FakeUart()
|
||||||
|
fake.read_chunks = [b'not json\n']
|
||||||
|
|
||||||
|
client = UartClient(fake)
|
||||||
|
messages = client.read_available_messages()
|
||||||
|
|
||||||
|
assert messages[0]["type"] == "error"
|
||||||
|
assert messages[0]["message"] == "invalid_json"
|
||||||
|
|||||||
Reference in New Issue
Block a user