237 lines
6.8 KiB
Python
237 lines
6.8 KiB
Python
import json
|
|
import random
|
|
import time
|
|
from pathlib import Path
|
|
|
|
from protocol import (
|
|
MessageType,
|
|
status_response,
|
|
relay_response,
|
|
wifi_response,
|
|
ignition_response,
|
|
sensor_fault_response,
|
|
)
|
|
|
|
from protocol import (
|
|
MessageType,
|
|
status_response,
|
|
relay_response,
|
|
wifi_response,
|
|
ignition_response,
|
|
sensor_fault_response,
|
|
)
|
|
|
|
|
|
class ESP32Simulator:
|
|
def __init__(self):
|
|
self.relays = {
|
|
"starlink": False,
|
|
"fridge": True
|
|
}
|
|
|
|
self.wifi_override_until = 0
|
|
self.ignition_on = True
|
|
self.failed_sensors = set()
|
|
self.soc = 82.0
|
|
self.last_update = time.time()
|
|
|
|
def load_config(self):
|
|
config_path = Path(__file__).parent / "config.json"
|
|
with config_path.open() as f:
|
|
return json.load(f)
|
|
|
|
def update_battery(self):
|
|
now = time.time()
|
|
elapsed_hours = (now - self.last_update) / 3600
|
|
self.last_update = now
|
|
|
|
load_amps = 6.0
|
|
if self.relays["starlink"]:
|
|
load_amps += 4.5
|
|
if self.relays["fridge"]:
|
|
load_amps += 2.0
|
|
|
|
capacity_ah = 100
|
|
self.soc -= (load_amps * elapsed_hours / capacity_ah) * 100
|
|
self.soc = max(self.soc, 0)
|
|
|
|
return -load_amps
|
|
|
|
|
|
def handle_message(self, message):
|
|
message_type = message.get("type")
|
|
|
|
if message_type == MessageType.STATUS_REQUEST:
|
|
return status_response(self.get_status())
|
|
|
|
if message_type == MessageType.SET_RELAY:
|
|
relay = message.get("relay")
|
|
state = message.get("state")
|
|
success = self.set_relay(relay, state)
|
|
|
|
if not success:
|
|
return relay_response(False, error="Unknown relay")
|
|
|
|
return relay_response(True, relay=relay, state=self.relays[relay])
|
|
|
|
if message_type == MessageType.ENABLE_WIFI:
|
|
minutes = int(message.get("minutes", 10))
|
|
self.enable_wifi(minutes)
|
|
return wifi_response(True, expires_minutes=minutes)
|
|
|
|
if message_type == MessageType.TOGGLE_IGNITION:
|
|
ignition_on = self.toggle_ignition()
|
|
return ignition_response(True, ignition_on)
|
|
|
|
if message_type == MessageType.TOGGLE_SENSOR_FAULT:
|
|
sensor = message.get("sensor")
|
|
failed = self.toggle_sensor_fault(sensor)
|
|
|
|
if failed is None:
|
|
return sensor_fault_response(False, error="Unknown sensor")
|
|
|
|
return sensor_fault_response(True, sensor=sensor, failed=failed)
|
|
|
|
return {
|
|
"type": "error",
|
|
"success": False,
|
|
"error": f"Unknown message type: {message_type}"
|
|
}
|
|
|
|
|
|
def handle_message(self, message):
|
|
message_type = message.get("type")
|
|
|
|
if message_type == MessageType.STATUS_REQUEST:
|
|
return status_response(self.get_status())
|
|
|
|
if message_type == MessageType.SET_RELAY:
|
|
relay = message.get("relay")
|
|
state = message.get("state")
|
|
success = self.set_relay(relay, state)
|
|
|
|
if not success:
|
|
return relay_response(False, error="Unknown relay")
|
|
|
|
return relay_response(True, relay=relay, state=self.relays[relay])
|
|
|
|
if message_type == MessageType.ENABLE_WIFI:
|
|
minutes = int(message.get("minutes", 10))
|
|
self.enable_wifi(minutes)
|
|
return wifi_response(True, expires_minutes=minutes)
|
|
|
|
if message_type == MessageType.TOGGLE_IGNITION:
|
|
ignition_on = self.toggle_ignition()
|
|
return ignition_response(True, ignition_on)
|
|
|
|
if message_type == MessageType.TOGGLE_SENSOR_FAULT:
|
|
sensor = message.get("sensor")
|
|
failed = self.toggle_sensor_fault(sensor)
|
|
|
|
if failed is None:
|
|
return sensor_fault_response(False, error="Unknown sensor")
|
|
|
|
return sensor_fault_response(True, sensor=sensor, failed=failed)
|
|
|
|
return {
|
|
"type": "error",
|
|
"success": False,
|
|
"error": f"Unknown message type: {message_type}"
|
|
}
|
|
|
|
def get_status(self):
|
|
current = self.update_battery()
|
|
starlink_on = self.relays["starlink"]
|
|
wifi_override_active = time.time() < self.wifi_override_until
|
|
|
|
sensor_names = [
|
|
"fridge_zone_1",
|
|
"fridge_zone_2",
|
|
"rear_seat",
|
|
"outside"
|
|
]
|
|
|
|
temps = {
|
|
"fridge_zone_1": None if "fridge_zone_1" in self.failed_sensors else self.sensor(36, 2),
|
|
"fridge_zone_2": None if "fridge_zone_2" in self.failed_sensors else self.sensor(12, 3),
|
|
"rear_seat": None if "rear_seat" in self.failed_sensors else self.sensor(78, 8),
|
|
"outside": None if "outside" in self.failed_sensors else self.sensor(88, 8)
|
|
}
|
|
|
|
sensor_health = {
|
|
name: name not in self.failed_sensors
|
|
for name in sensor_names
|
|
}
|
|
|
|
remaining_ah = round(self.soc, 1)
|
|
runtime_hours = round(remaining_ah / abs(current), 1) if current else 0
|
|
|
|
return {
|
|
"timestamp": int(time.time()),
|
|
|
|
"vehicle": {
|
|
"ignition_on": self.ignition_on
|
|
},
|
|
|
|
"battery": {
|
|
"soc": round(self.soc),
|
|
"voltage": round(13.0 + (self.soc / 100) * 0.4, 2),
|
|
"current": round(current, 1),
|
|
"remaining_ah": remaining_ah,
|
|
"runtime_hours": runtime_hours,
|
|
"temperature_f": self.sensor(76, 4)
|
|
},
|
|
|
|
"temps": temps,
|
|
"sensor_health": sensor_health,
|
|
|
|
"relays": self.relays,
|
|
|
|
"network": {
|
|
"wifi_enabled": starlink_on or wifi_override_active,
|
|
"wifi_override_active": wifi_override_active,
|
|
"rs485_connected": True,
|
|
"starlink_enabled": starlink_on
|
|
},
|
|
|
|
"config": self.load_config()
|
|
}
|
|
|
|
def sensor(self, base, spread):
|
|
return round(random.uniform(base - spread, base + spread), 1)
|
|
|
|
def set_relay(self, name, state):
|
|
if name not in self.relays:
|
|
return False
|
|
|
|
self.relays[name] = bool(state)
|
|
return True
|
|
|
|
def enable_wifi(self, minutes):
|
|
self.wifi_override_until = time.time() + minutes * 60
|
|
|
|
def toggle_ignition(self):
|
|
self.ignition_on = not self.ignition_on
|
|
return self.ignition_on
|
|
|
|
def toggle_sensor_fault(self, name):
|
|
valid = {
|
|
"fridge_zone_1",
|
|
"fridge_zone_2",
|
|
"rear_seat",
|
|
"outside"
|
|
}
|
|
|
|
if name not in valid:
|
|
return None
|
|
|
|
if name in self.failed_sensors:
|
|
self.failed_sensors.remove(name)
|
|
return False
|
|
|
|
self.failed_sensors.add(name)
|
|
return True
|
|
|
|
|
|
esp32 = ESP32Simulator()
|