From 2eb36a2d85da1048c4a58da18a8f90b0618f8342 Mon Sep 17 00:00:00 2001 From: root Date: Wed, 3 Jun 2026 00:27:10 -0600 Subject: [PATCH] Add Pico simulator and shared protocol layer --- simulator/app.py | 48 ++++++++------------ simulator/esp32_sim.py | 100 +++++++++++++++++++++++++++++++++++++++++ simulator/pico_sim.py | 42 +++++++++++++++++ simulator/protocol.py | 84 ++++++++++++++++++++++++++++++++++ 4 files changed, 244 insertions(+), 30 deletions(-) create mode 100644 simulator/pico_sim.py create mode 100644 simulator/protocol.py diff --git a/simulator/app.py b/simulator/app.py index 773ca2c..a4c0067 100644 --- a/simulator/app.py +++ b/simulator/app.py @@ -1,8 +1,11 @@ from flask import Flask, jsonify, render_template, request from esp32_sim import esp32 +from pico_sim import PicoSimulator app = Flask(__name__) +pico = PicoSimulator(esp32) + @app.route("/") def index(): @@ -11,75 +14,60 @@ def index(): @app.route("/status") def status(): - return jsonify(esp32.get_status()) + return jsonify(pico.get_status()) @app.route("/battery") def battery(): - return jsonify(esp32.get_status()["battery"]) + return jsonify(pico.get_status()["battery"]) @app.route("/temps") def temps(): - return jsonify(esp32.get_status()["temps"]) + return jsonify(pico.get_status()["temps"]) @app.route("/relays") def relays(): - return jsonify(esp32.relays) + return jsonify(pico.get_status()["relays"]) @app.route("/relay/", methods=["POST"]) def set_relay(name): data = request.get_json(force=True) - success = esp32.set_relay(name, data.get("state", False)) + response = pico.set_relay(name, data.get("state", False)) - if not success: - return jsonify({"success": False, "error": "Unknown relay"}), 404 + if not response["success"]: + return jsonify(response), 404 - return jsonify({ - "success": True, - name: esp32.relays[name] - }) + return jsonify(response) @app.route("/network") def network(): - return jsonify(esp32.get_status()["network"]) + return jsonify(pico.get_status()["network"]) @app.route("/network/wifi", methods=["POST"]) def enable_wifi(): data = request.get_json(force=True) minutes = int(data.get("minutes", 10)) - esp32.enable_wifi(minutes) - - return jsonify({ - "success": True, - "expires_minutes": minutes - }) + return jsonify(pico.enable_wifi(minutes)) @app.route("/vehicle/ignition", methods=["POST"]) def toggle_ignition(): - return jsonify({ - "success": True, - "ignition_on": esp32.toggle_ignition() - }) + return jsonify(pico.toggle_ignition()) @app.route("/sensor//fault", methods=["POST"]) def toggle_sensor_fault(name): - failed = esp32.toggle_sensor_fault(name) + response = pico.toggle_sensor_fault(name) - if failed is None: - return jsonify({"success": False, "error": "Unknown sensor"}), 404 + if not response["success"]: + return jsonify(response), 404 - return jsonify({ - "success": True, - "sensor": name, - "failed": failed - }) + return jsonify(response) if __name__ == "__main__": diff --git a/simulator/esp32_sim.py b/simulator/esp32_sim.py index 464b754..9a7d68c 100644 --- a/simulator/esp32_sim.py +++ b/simulator/esp32_sim.py @@ -3,6 +3,24 @@ import random import time from pathlib import Path +from protocol import ( + MessageType, + status_response, + relay_response, + wifi_response, + ignition_response, + sensor_fault_response, +) + +from protocol import ( + MessageType, + status_response, + relay_response, + wifi_response, + ignition_response, + sensor_fault_response, +) + class ESP32Simulator: def __init__(self): @@ -39,6 +57,88 @@ class ESP32Simulator: return -load_amps + + def handle_message(self, message): + message_type = message.get("type") + + if message_type == MessageType.STATUS_REQUEST: + return status_response(self.get_status()) + + if message_type == MessageType.SET_RELAY: + relay = message.get("relay") + state = message.get("state") + success = self.set_relay(relay, state) + + if not success: + return relay_response(False, error="Unknown relay") + + return relay_response(True, relay=relay, state=self.relays[relay]) + + if message_type == MessageType.ENABLE_WIFI: + minutes = int(message.get("minutes", 10)) + self.enable_wifi(minutes) + return wifi_response(True, expires_minutes=minutes) + + if message_type == MessageType.TOGGLE_IGNITION: + ignition_on = self.toggle_ignition() + return ignition_response(True, ignition_on) + + if message_type == MessageType.TOGGLE_SENSOR_FAULT: + sensor = message.get("sensor") + failed = self.toggle_sensor_fault(sensor) + + if failed is None: + return sensor_fault_response(False, error="Unknown sensor") + + return sensor_fault_response(True, sensor=sensor, failed=failed) + + return { + "type": "error", + "success": False, + "error": f"Unknown message type: {message_type}" + } + + + def handle_message(self, message): + message_type = message.get("type") + + if message_type == MessageType.STATUS_REQUEST: + return status_response(self.get_status()) + + if message_type == MessageType.SET_RELAY: + relay = message.get("relay") + state = message.get("state") + success = self.set_relay(relay, state) + + if not success: + return relay_response(False, error="Unknown relay") + + return relay_response(True, relay=relay, state=self.relays[relay]) + + if message_type == MessageType.ENABLE_WIFI: + minutes = int(message.get("minutes", 10)) + self.enable_wifi(minutes) + return wifi_response(True, expires_minutes=minutes) + + if message_type == MessageType.TOGGLE_IGNITION: + ignition_on = self.toggle_ignition() + return ignition_response(True, ignition_on) + + if message_type == MessageType.TOGGLE_SENSOR_FAULT: + sensor = message.get("sensor") + failed = self.toggle_sensor_fault(sensor) + + if failed is None: + return sensor_fault_response(False, error="Unknown sensor") + + return sensor_fault_response(True, sensor=sensor, failed=failed) + + return { + "type": "error", + "success": False, + "error": f"Unknown message type: {message_type}" + } + def get_status(self): current = self.update_battery() starlink_on = self.relays["starlink"] diff --git a/simulator/pico_sim.py b/simulator/pico_sim.py new file mode 100644 index 0000000..70d1821 --- /dev/null +++ b/simulator/pico_sim.py @@ -0,0 +1,42 @@ +from protocol import ( + MessageType, + status_request, + set_relay_request, + enable_wifi_request, + toggle_ignition_request, + toggle_sensor_fault_request, +) + + +class PicoSimulator: + def __init__(self, controller): + self.controller = controller + self.last_status = None + self.primary_link = "rs485" + self.backup_link_available = True + + def send_message(self, message): + """ + This simulates the Pico sending a protocol message to the ESP32. + + Later, this is where RS-485 serial communication will replace + the direct Python method call. + """ + return self.controller.handle_message(message) + + def get_status(self): + response = self.send_message(status_request()) + self.last_status = response["data"] + return self.last_status + + def set_relay(self, relay, state): + return self.send_message(set_relay_request(relay, state)) + + def enable_wifi(self, minutes=10): + return self.send_message(enable_wifi_request(minutes)) + + def toggle_ignition(self): + return self.send_message(toggle_ignition_request()) + + def toggle_sensor_fault(self, sensor): + return self.send_message(toggle_sensor_fault_request(sensor)) diff --git a/simulator/protocol.py b/simulator/protocol.py new file mode 100644 index 0000000..6259640 --- /dev/null +++ b/simulator/protocol.py @@ -0,0 +1,84 @@ +class MessageType: + STATUS_REQUEST = "status_request" + STATUS_RESPONSE = "status_response" + SET_RELAY = "set_relay" + RELAY_RESPONSE = "relay_response" + ENABLE_WIFI = "enable_wifi" + WIFI_RESPONSE = "wifi_response" + TOGGLE_IGNITION = "toggle_ignition" + IGNITION_RESPONSE = "ignition_response" + TOGGLE_SENSOR_FAULT = "toggle_sensor_fault" + SENSOR_FAULT_RESPONSE = "sensor_fault_response" + + +def status_request(): + return {"type": MessageType.STATUS_REQUEST} + + +def status_response(data): + return { + "type": MessageType.STATUS_RESPONSE, + "data": data + } + + +def set_relay_request(relay, state): + return { + "type": MessageType.SET_RELAY, + "relay": relay, + "state": bool(state) + } + + +def relay_response(success, relay=None, state=None, error=None): + return { + "type": MessageType.RELAY_RESPONSE, + "success": success, + "relay": relay, + "state": state, + "error": error + } + + +def enable_wifi_request(minutes): + return { + "type": MessageType.ENABLE_WIFI, + "minutes": int(minutes) + } + + +def wifi_response(success, expires_minutes=None): + return { + "type": MessageType.WIFI_RESPONSE, + "success": success, + "expires_minutes": expires_minutes + } + + +def toggle_ignition_request(): + return {"type": MessageType.TOGGLE_IGNITION} + + +def ignition_response(success, ignition_on): + return { + "type": MessageType.IGNITION_RESPONSE, + "success": success, + "ignition_on": ignition_on + } + + +def toggle_sensor_fault_request(sensor): + return { + "type": MessageType.TOGGLE_SENSOR_FAULT, + "sensor": sensor + } + + +def sensor_fault_response(success, sensor=None, failed=None, error=None): + return { + "type": MessageType.SENSOR_FAULT_RESPONSE, + "success": success, + "sensor": sensor, + "failed": failed, + "error": error + }