Add RS485 transport simulation layer

This commit is contained in:
root
2026-06-03 00:52:33 -06:00
parent 131589fe6e
commit eff60ed9d2
4 changed files with 133 additions and 24 deletions
+33 -24
View File
@@ -7,30 +7,30 @@ from protocol import (
toggle_ignition_request,
toggle_sensor_fault_request,
)
from transport import RS485Transport
class PicoSimulator:
def __init__(self, controller):
self.controller = controller
self.transport = RS485Transport(controller)
self.last_status = None
self.primary_link = "rs485"
self.rs485_connected = True
self.backup_link_available = True
self.messages_sent = 0
self.messages_received = 0
self.last_message_time = None
self.last_latency_ms = None
def send_message(self, message):
self.messages_sent += 1
start = time.time()
if not self.rs485_connected:
return {
"type": "error",
"success": False,
"error": "RS-485 disconnected"
}
response = self.transport.send(message)
response = self.controller.handle_message(message)
if response.get("type") == "error":
return response
self.last_latency_ms = round((time.time() - start) * 1000, 1)
self.messages_received += 1
self.last_message_time = int(time.time())
return response
@@ -42,9 +42,7 @@ class PicoSimulator:
if self.last_status:
self.last_status["network"]["rs485_connected"] = False
self.last_status["network"]["communication_lost"] = True
self.last_status["network"]["messages_sent"] = self.messages_sent
self.last_status["network"]["messages_received"] = self.messages_received
self.last_status["network"]["last_message_time"] = self.last_message_time
self.add_comms_to_status(self.last_status)
return self.last_status
return {
@@ -59,22 +57,24 @@ class PicoSimulator:
"communication_lost": True,
"wifi_enabled": False,
"wifi_override_active": False,
"starlink_enabled": False,
"messages_sent": self.messages_sent,
"messages_received": self.messages_received,
"last_message_time": self.last_message_time
"starlink_enabled": False
},
"config": {}
}
self.last_status = response["data"]
self.last_status["network"]["rs485_connected"] = self.rs485_connected
self.last_status["network"]["rs485_connected"] = self.transport.connected
self.last_status["network"]["communication_lost"] = False
self.last_status["network"]["messages_sent"] = self.messages_sent
self.last_status["network"]["messages_received"] = self.messages_received
self.last_status["network"]["last_message_time"] = self.last_message_time
self.add_comms_to_status(self.last_status)
return self.last_status
def add_comms_to_status(self, status):
status["network"]["messages_sent"] = self.messages_sent
status["network"]["messages_received"] = self.messages_received
status["network"]["last_message_time"] = self.last_message_time
status["network"]["latency_ms"] = self.last_latency_ms
status["network"]["packet_loss_percent"] = self.transport.packet_loss_percent
def set_relay(self, relay, state):
return self.send_message(set_relay_request(relay, state))
@@ -88,17 +88,26 @@ class PicoSimulator:
return self.send_message(toggle_sensor_fault_request(sensor))
def disconnect_rs485(self):
self.rs485_connected = False
self.transport.disconnect()
def restore_rs485(self):
self.rs485_connected = True
self.transport.restore()
def set_latency(self, latency_ms):
self.transport.set_latency(latency_ms)
def set_packet_loss(self, percent):
self.transport.set_packet_loss(percent)
def get_comms(self):
return {
"primary": self.primary_link,
"backup_available": self.backup_link_available,
"rs485_connected": self.rs485_connected,
"rs485_connected": self.transport.connected,
"messages_sent": self.messages_sent,
"messages_received": self.messages_received,
"last_message_time": self.last_message_time
"last_message_time": self.last_message_time,
"latency_ms": self.last_latency_ms,
"configured_latency_ms": self.transport.latency_ms,
"packet_loss_percent": self.transport.packet_loss_percent
}