114 lines
3.8 KiB
Python
114 lines
3.8 KiB
Python
import time
|
|
|
|
from protocol import (
|
|
status_request,
|
|
set_relay_request,
|
|
enable_wifi_request,
|
|
toggle_ignition_request,
|
|
toggle_sensor_fault_request,
|
|
)
|
|
from transport import RS485Transport
|
|
|
|
|
|
class PicoSimulator:
|
|
def __init__(self, controller):
|
|
self.transport = RS485Transport(controller)
|
|
self.last_status = None
|
|
self.primary_link = "rs485"
|
|
self.backup_link_available = True
|
|
self.messages_sent = 0
|
|
self.messages_received = 0
|
|
self.last_message_time = None
|
|
self.last_latency_ms = None
|
|
|
|
def send_message(self, message):
|
|
self.messages_sent += 1
|
|
start = time.time()
|
|
|
|
response = self.transport.send(message)
|
|
|
|
if response.get("type") == "error":
|
|
return response
|
|
|
|
self.last_latency_ms = round((time.time() - start) * 1000, 1)
|
|
self.messages_received += 1
|
|
self.last_message_time = int(time.time())
|
|
return response
|
|
|
|
def get_status(self):
|
|
response = self.send_message(status_request())
|
|
|
|
if response.get("type") == "error":
|
|
if self.last_status:
|
|
self.last_status["network"]["rs485_connected"] = False
|
|
self.last_status["network"]["communication_lost"] = True
|
|
self.add_comms_to_status(self.last_status)
|
|
return self.last_status
|
|
|
|
return {
|
|
"timestamp": int(time.time()),
|
|
"vehicle": {"ignition_on": True},
|
|
"battery": {},
|
|
"temps": {},
|
|
"sensor_health": {},
|
|
"relays": {},
|
|
"network": {
|
|
"rs485_connected": False,
|
|
"communication_lost": True,
|
|
"wifi_enabled": False,
|
|
"wifi_override_active": False,
|
|
"starlink_enabled": False
|
|
},
|
|
"config": {}
|
|
}
|
|
|
|
self.last_status = response["data"]
|
|
self.last_status["network"]["rs485_connected"] = self.transport.connected
|
|
self.last_status["network"]["communication_lost"] = False
|
|
self.add_comms_to_status(self.last_status)
|
|
return self.last_status
|
|
|
|
def add_comms_to_status(self, status):
|
|
status["network"]["messages_sent"] = self.messages_sent
|
|
status["network"]["messages_received"] = self.messages_received
|
|
status["network"]["last_message_time"] = self.last_message_time
|
|
status["network"]["latency_ms"] = self.last_latency_ms
|
|
status["network"]["packet_loss_percent"] = self.transport.packet_loss_percent
|
|
|
|
def set_relay(self, relay, state):
|
|
return self.send_message(set_relay_request(relay, state))
|
|
|
|
def enable_wifi(self, minutes=10):
|
|
return self.send_message(enable_wifi_request(minutes))
|
|
|
|
def toggle_ignition(self):
|
|
return self.send_message(toggle_ignition_request())
|
|
|
|
def toggle_sensor_fault(self, sensor):
|
|
return self.send_message(toggle_sensor_fault_request(sensor))
|
|
|
|
def disconnect_rs485(self):
|
|
self.transport.disconnect()
|
|
|
|
def restore_rs485(self):
|
|
self.transport.restore()
|
|
|
|
def set_latency(self, latency_ms):
|
|
self.transport.set_latency(latency_ms)
|
|
|
|
def set_packet_loss(self, percent):
|
|
self.transport.set_packet_loss(percent)
|
|
|
|
def get_comms(self):
|
|
return {
|
|
"primary": self.primary_link,
|
|
"backup_available": self.backup_link_available,
|
|
"rs485_connected": self.transport.connected,
|
|
"messages_sent": self.messages_sent,
|
|
"messages_received": self.messages_received,
|
|
"last_message_time": self.last_message_time,
|
|
"latency_ms": self.last_latency_ms,
|
|
"configured_latency_ms": self.transport.latency_ms,
|
|
"packet_loss_percent": self.transport.packet_loss_percent
|
|
}
|