Files
xterra-overland-dashboard/simulator/esp32_sim.py
T
2026-06-03 00:36:08 -06:00

245 lines
7.0 KiB
Python

import json
import random
import time
from pathlib import Path
from protocol import (
MessageType,
status_response,
relay_response,
wifi_response,
ignition_response,
sensor_fault_response,
)
from protocol import (
MessageType,
status_response,
relay_response,
wifi_response,
ignition_response,
sensor_fault_response,
)
class ESP32Simulator:
def __init__(self):
self.relays = {
"starlink": False,
"fridge": True
}
self.wifi_override_until = 0
self.ignition_on = True
self.failed_sensors = set()
self.soc = 82.0
self.last_update = time.time()
def config_path(self):
return Path(__file__).parent / "config.json"
def load_config(self):
with self.config_path().open() as f:
return json.load(f)
def save_config(self, config):
with self.config_path().open("w") as f:
json.dump(config, f, indent=2)
return self.load_config()
def update_battery(self):
now = time.time()
elapsed_hours = (now - self.last_update) / 3600
self.last_update = now
load_amps = 6.0
if self.relays["starlink"]:
load_amps += 4.5
if self.relays["fridge"]:
load_amps += 2.0
capacity_ah = 100
self.soc -= (load_amps * elapsed_hours / capacity_ah) * 100
self.soc = max(self.soc, 0)
return -load_amps
def handle_message(self, message):
message_type = message.get("type")
if message_type == MessageType.STATUS_REQUEST:
return status_response(self.get_status())
if message_type == MessageType.SET_RELAY:
relay = message.get("relay")
state = message.get("state")
success = self.set_relay(relay, state)
if not success:
return relay_response(False, error="Unknown relay")
return relay_response(True, relay=relay, state=self.relays[relay])
if message_type == MessageType.ENABLE_WIFI:
minutes = int(message.get("minutes", 10))
self.enable_wifi(minutes)
return wifi_response(True, expires_minutes=minutes)
if message_type == MessageType.TOGGLE_IGNITION:
ignition_on = self.toggle_ignition()
return ignition_response(True, ignition_on)
if message_type == MessageType.TOGGLE_SENSOR_FAULT:
sensor = message.get("sensor")
failed = self.toggle_sensor_fault(sensor)
if failed is None:
return sensor_fault_response(False, error="Unknown sensor")
return sensor_fault_response(True, sensor=sensor, failed=failed)
return {
"type": "error",
"success": False,
"error": f"Unknown message type: {message_type}"
}
def handle_message(self, message):
message_type = message.get("type")
if message_type == MessageType.STATUS_REQUEST:
return status_response(self.get_status())
if message_type == MessageType.SET_RELAY:
relay = message.get("relay")
state = message.get("state")
success = self.set_relay(relay, state)
if not success:
return relay_response(False, error="Unknown relay")
return relay_response(True, relay=relay, state=self.relays[relay])
if message_type == MessageType.ENABLE_WIFI:
minutes = int(message.get("minutes", 10))
self.enable_wifi(minutes)
return wifi_response(True, expires_minutes=minutes)
if message_type == MessageType.TOGGLE_IGNITION:
ignition_on = self.toggle_ignition()
return ignition_response(True, ignition_on)
if message_type == MessageType.TOGGLE_SENSOR_FAULT:
sensor = message.get("sensor")
failed = self.toggle_sensor_fault(sensor)
if failed is None:
return sensor_fault_response(False, error="Unknown sensor")
return sensor_fault_response(True, sensor=sensor, failed=failed)
return {
"type": "error",
"success": False,
"error": f"Unknown message type: {message_type}"
}
def get_status(self):
current = self.update_battery()
starlink_on = self.relays["starlink"]
wifi_override_active = time.time() < self.wifi_override_until
sensor_names = [
"fridge_zone_1",
"fridge_zone_2",
"rear_seat",
"outside"
]
temps = {
"fridge_zone_1": None if "fridge_zone_1" in self.failed_sensors else self.sensor(36, 2),
"fridge_zone_2": None if "fridge_zone_2" in self.failed_sensors else self.sensor(12, 3),
"rear_seat": None if "rear_seat" in self.failed_sensors else self.sensor(78, 8),
"outside": None if "outside" in self.failed_sensors else self.sensor(88, 8)
}
sensor_health = {
name: name not in self.failed_sensors
for name in sensor_names
}
remaining_ah = round(self.soc, 1)
runtime_hours = round(remaining_ah / abs(current), 1) if current else 0
return {
"timestamp": int(time.time()),
"vehicle": {
"ignition_on": self.ignition_on
},
"battery": {
"soc": round(self.soc),
"voltage": round(13.0 + (self.soc / 100) * 0.4, 2),
"current": round(current, 1),
"remaining_ah": remaining_ah,
"runtime_hours": runtime_hours,
"temperature_f": self.sensor(76, 4)
},
"temps": temps,
"sensor_health": sensor_health,
"relays": self.relays,
"network": {
"wifi_enabled": starlink_on or wifi_override_active,
"wifi_override_active": wifi_override_active,
"rs485_connected": True,
"starlink_enabled": starlink_on
},
"config": self.load_config()
}
def sensor(self, base, spread):
return round(random.uniform(base - spread, base + spread), 1)
def set_relay(self, name, state):
if name not in self.relays:
return False
self.relays[name] = bool(state)
return True
def enable_wifi(self, minutes):
self.wifi_override_until = time.time() + minutes * 60
def toggle_ignition(self):
self.ignition_on = not self.ignition_on
return self.ignition_on
def toggle_sensor_fault(self, name):
valid = {
"fridge_zone_1",
"fridge_zone_2",
"rear_seat",
"outside"
}
if name not in valid:
return None
if name in self.failed_sensors:
self.failed_sensors.remove(name)
return False
self.failed_sensors.add(name)
return True
esp32 = ESP32Simulator()