Add Pico simulator and shared protocol layer

This commit is contained in:
root
2026-06-03 00:27:10 -06:00
parent cdff42ddd4
commit 2eb36a2d85
4 changed files with 244 additions and 30 deletions
+100
View File
@@ -3,6 +3,24 @@ import random
import time
from pathlib import Path
from protocol import (
MessageType,
status_response,
relay_response,
wifi_response,
ignition_response,
sensor_fault_response,
)
from protocol import (
MessageType,
status_response,
relay_response,
wifi_response,
ignition_response,
sensor_fault_response,
)
class ESP32Simulator:
def __init__(self):
@@ -39,6 +57,88 @@ class ESP32Simulator:
return -load_amps
def handle_message(self, message):
message_type = message.get("type")
if message_type == MessageType.STATUS_REQUEST:
return status_response(self.get_status())
if message_type == MessageType.SET_RELAY:
relay = message.get("relay")
state = message.get("state")
success = self.set_relay(relay, state)
if not success:
return relay_response(False, error="Unknown relay")
return relay_response(True, relay=relay, state=self.relays[relay])
if message_type == MessageType.ENABLE_WIFI:
minutes = int(message.get("minutes", 10))
self.enable_wifi(minutes)
return wifi_response(True, expires_minutes=minutes)
if message_type == MessageType.TOGGLE_IGNITION:
ignition_on = self.toggle_ignition()
return ignition_response(True, ignition_on)
if message_type == MessageType.TOGGLE_SENSOR_FAULT:
sensor = message.get("sensor")
failed = self.toggle_sensor_fault(sensor)
if failed is None:
return sensor_fault_response(False, error="Unknown sensor")
return sensor_fault_response(True, sensor=sensor, failed=failed)
return {
"type": "error",
"success": False,
"error": f"Unknown message type: {message_type}"
}
def handle_message(self, message):
message_type = message.get("type")
if message_type == MessageType.STATUS_REQUEST:
return status_response(self.get_status())
if message_type == MessageType.SET_RELAY:
relay = message.get("relay")
state = message.get("state")
success = self.set_relay(relay, state)
if not success:
return relay_response(False, error="Unknown relay")
return relay_response(True, relay=relay, state=self.relays[relay])
if message_type == MessageType.ENABLE_WIFI:
minutes = int(message.get("minutes", 10))
self.enable_wifi(minutes)
return wifi_response(True, expires_minutes=minutes)
if message_type == MessageType.TOGGLE_IGNITION:
ignition_on = self.toggle_ignition()
return ignition_response(True, ignition_on)
if message_type == MessageType.TOGGLE_SENSOR_FAULT:
sensor = message.get("sensor")
failed = self.toggle_sensor_fault(sensor)
if failed is None:
return sensor_fault_response(False, error="Unknown sensor")
return sensor_fault_response(True, sensor=sensor, failed=failed)
return {
"type": "error",
"success": False,
"error": f"Unknown message type: {message_type}"
}
def get_status(self):
current = self.update_battery()
starlink_on = self.relays["starlink"]