import asyncio import json from datetime import datetime import websockets from kivy import Logger from pydantic import BaseModel, field_validator from config import STORE_HOST, STORE_PORT # Pydantic models class ProcessedAgentData(BaseModel): id: int road_state: str user_id: int x: float y: float z: float latitude: float longitude: float timestamp: datetime @classmethod @field_validator("timestamp", mode="before") def check_timestamp(cls, value): if isinstance(value, datetime): return value try: return datetime.fromisoformat(value) except (TypeError, ValueError): raise ValueError( "Invalid timestamp format. Expected ISO 8601 format (YYYY-MM-DDTHH:MM:SSZ)." ) class Datasource: def __init__(self): self.websocket: Connection | None = None self.index = 0 self.connection_status = None self._new_points = [] self._active_markers = [] asyncio.ensure_future(self.connect_to_server()) def get_new_points(self): Logger.debug(self._new_points) points = self._new_points self._new_points = [] return points async def connect_to_server(self): uri = f"ws://{STORE_HOST}:{STORE_PORT}/ws" while True: Logger.debug("CONNECT TO SERVER") async with websockets.connect(uri) as websocket: self.connection_status = "Connected" self.websocket = websocket try: while True: data = await websocket.recv() parsed_data = json.loads(data) self.handle_received_data(parsed_data) except websockets.ConnectionClosedOK: self.connection_status = "Disconnected" Logger.debug("SERVER DISCONNECT") def update_db_record_visibility(self, record_id): if self.websocket: data = json.dumps({"id": record_id}) asyncio.ensure_future(self.websocket.send(data)) def map_lat_lon_to_processed_agent_data(self, lat: float, lon: float) -> ProcessedAgentData | None: distances = tuple((abs(lon - marker.latitude) ** 2 + abs(lat - marker.longitude) ** 2) ** 0.5 for marker in self._active_markers) if len(distances) == 0: return None min_distance = min(distances) marker = self._active_markers[distances.index(min_distance)] if min_distance < 0.005: return marker else: return None def handle_received_data(self, data): # Update your UI or perform actions with received data here Logger.debug(f"Received data: {data}") processed_agent_data_list = sorted( [ ProcessedAgentData(**processed_data_json) for processed_data_json in json.loads(data) ], key=lambda v: v.timestamp, ) self._active_markers += [i for i in processed_agent_data_list if i.road_state != 'normal'] new_points = [ ( processed_agent_data.longitude, processed_agent_data.latitude, processed_agent_data.road_state, processed_agent_data.user_id ) for processed_agent_data in processed_agent_data_list ] self._new_points.extend(new_points)