Add Pico dashboard core state and alarm logic

This commit is contained in:
root
2026-06-03 02:33:32 -06:00
parent 503af6cd8c
commit 0d239361fd
5 changed files with 174 additions and 0 deletions
@@ -0,0 +1,13 @@
BATTERY_SOC_LOW = "battery_soc_low"
BATTERY_VOLTAGE_LOW = "battery_voltage_low"
FRIDGE_ZONE_1_HIGH = "fridge_zone_1_high"
FRIDGE_ZONE_2_HIGH = "fridge_zone_2_high"
SENSOR_FAILURE = "sensor_failure"
COMMUNICATION_LOST = "communication_lost"
DEFAULT_THRESHOLDS = {
"battery_soc_low": 20,
"battery_voltage_low": 12.0,
"fridge_temp_high": 40.0,
}
+44
View File
@@ -0,0 +1,44 @@
from .alarm_definitions import (
BATTERY_SOC_LOW,
BATTERY_VOLTAGE_LOW,
FRIDGE_ZONE_1_HIGH,
FRIDGE_ZONE_2_HIGH,
SENSOR_FAILURE,
COMMUNICATION_LOST,
DEFAULT_THRESHOLDS,
)
class AlarmManager:
def __init__(self, thresholds=None):
self.thresholds = dict(DEFAULT_THRESHOLDS)
if thresholds:
self.thresholds.update(thresholds)
def evaluate(self, state):
alarms = []
soc = state.battery.get("soc")
if soc is not None and soc < self.thresholds["battery_soc_low"]:
alarms.append(BATTERY_SOC_LOW)
voltage = state.battery.get("voltage")
if voltage is not None and voltage < self.thresholds["battery_voltage_low"]:
alarms.append(BATTERY_VOLTAGE_LOW)
z1 = state.temps.get("fridge_zone_1")
if z1 is not None and z1 > self.thresholds["fridge_temp_high"]:
alarms.append(FRIDGE_ZONE_1_HIGH)
z2 = state.temps.get("fridge_zone_2")
if z2 is not None and z2 > self.thresholds["fridge_temp_high"]:
alarms.append(FRIDGE_ZONE_2_HIGH)
if not state.network.get("uart_connected", False):
alarms.append(COMMUNICATION_LOST)
for sensor_name, online in state.sensor_health.items():
if not online:
alarms.append(f"{SENSOR_FAILURE}:{sensor_name}")
return alarms
+35
View File
@@ -0,0 +1,35 @@
STATUS_REQUEST = "status_request"
STATUS_RESPONSE = "status_response"
SET_RELAY = "set_relay"
RELAY_RESPONSE = "relay_response"
ERROR = "error"
VALID_RELAYS = {"starlink", "fridge"}
def make_status_request():
return {"type": STATUS_REQUEST}
def make_set_relay(relay, enabled):
if relay not in VALID_RELAYS:
raise ValueError(f"Invalid relay: {relay}")
return {
"type": SET_RELAY,
"relay": relay,
"enabled": bool(enabled),
}
def is_status_response(message):
return message.get("type") == STATUS_RESPONSE
def is_relay_response(message):
return message.get("type") == RELAY_RESPONSE
def is_error(message):
return message.get("type") == ERROR
+61
View File
@@ -0,0 +1,61 @@
class AppState:
def __init__(self):
self.battery = {
"soc": None,
"voltage": None,
"current": None,
"remaining_ah": None,
"runtime_hours": None,
"temperature_f": None,
}
self.temps = {
"fridge_zone_1": None,
"fridge_zone_2": None,
"rear_seat": None,
"outside": None,
}
self.sensor_health = {
"fridge_zone_1": False,
"fridge_zone_2": False,
"rear_seat": False,
"outside": False,
}
self.relays = {
"starlink": False,
"fridge": False,
}
self.vehicle = {
"ignition_on": False,
}
self.network = {
"uart_connected": False,
"wifi_enabled": False,
}
self.last_error = None
self.last_status_timestamp = None
def update_from_status(self, message):
self.last_status_timestamp = message.get("timestamp")
self.battery.update(message.get("battery", {}))
self.temps.update(message.get("temps", {}))
self.sensor_health.update(message.get("sensor_health", {}))
self.relays.update(message.get("relays", {}))
self.vehicle.update(message.get("vehicle", {}))
self.network.update(message.get("network", {}))
def update_from_relay_response(self, message):
relay = message.get("relay")
enabled = message.get("enabled")
if relay in self.relays and enabled is not None:
self.relays[relay] = bool(enabled)
def set_error(self, message):
self.last_error = message
+21
View File
@@ -0,0 +1,21 @@
class ScreenManager:
VALID_SCREENS = ("dashboard", "battery", "temps", "power", "system")
def __init__(self):
self.current_screen = "dashboard"
def go_to(self, screen_name):
if screen_name not in self.VALID_SCREENS:
raise ValueError(f"Invalid screen: {screen_name}")
self.current_screen = screen_name
def next_screen(self):
current_index = self.VALID_SCREENS.index(self.current_screen)
next_index = (current_index + 1) % len(self.VALID_SCREENS)
self.current_screen = self.VALID_SCREENS[next_index]
def previous_screen(self):
current_index = self.VALID_SCREENS.index(self.current_screen)
previous_index = (current_index - 1) % len(self.VALID_SCREENS)
self.current_screen = self.VALID_SCREENS[previous_index]