Compare commits

..

5 Commits

Author SHA1 Message Date
3b49638afa Correct pothole deletion function
All checks were successful
Component testing / Hub testing (push) Successful in 26s
Component testing / Store testing (push) Successful in 26s
Component testing / Integration smoke testing (push) Successful in 2m33s
2026-03-25 19:52:54 +01:00
366488a8df Use lat lon validation 2026-03-25 19:52:33 +01:00
7355d64971 Add lat lon to pothole mapper 2026-03-25 19:51:58 +01:00
fe4ec12428 Add type hints 2026-03-25 19:51:10 +01:00
805b2658d4 [P] Add bump remove function
All checks were successful
Component testing / Hub testing (push) Successful in 28s
Component testing / Store testing (push) Successful in 32s
Component testing / Integration smoke testing (push) Successful in 2m43s
2026-03-25 19:05:16 +01:00
3 changed files with 38 additions and 26 deletions

View File

@@ -14,18 +14,31 @@ line_layer_colors = [
[1, 0, 1, 1], [1, 0, 1, 1],
] ]
def get_lat_lon(point: dict[str, float] | list[float] | tuple[float, float]) -> tuple[float, float] | None:
if isinstance(point, dict):
lat = point.get("lat")
lon = point.get("lon")
else:
lat, lon = point
if lat is None or lon is None:
return None
return lat, lon
class MapViewApp(App): class MapViewApp(App):
def __init__(self, **kwargs): def __init__(self, **kwargs):
super().__init__(**kwargs) super().__init__(**kwargs)
self.mapview = None self.mapview: MapView | None = None
self.datasource = Datasource(user_id=1) self.datasource = Datasource(user_id=1)
self.line_layers = dict() self.line_layers = dict()
self.car_markers = dict() self.car_markers = dict()
# додати необхідні змінні # додати необхідні змінні
self.bump_markers = [] self.bump_markers: list[MapMarker] = []
self.pothole_markers = [] self.pothole_markers: list[MapMarker] = []
def on_start(self): def on_start(self):
""" """
@@ -90,13 +103,14 @@ class MapViewApp(App):
if user_id == 1: if user_id == 1:
self.mapview.center_on(lat, lon) self.mapview.center_on(lat, lon)
def set_pothole_marker(self, point): def map_lat_lon_to_pothole(self, lat: float, lon: float) -> MapMarker | None:
if isinstance(point, dict): click_tolerance = self.mapview.zoom * 10
lat = point.get("lat") flt = filter(lambda marker: abs(lat - marker.lat) + abs(lon - marker.lon) < click_tolerance,
lon = point.get("lon") self.pothole_markers)
else: return next(flt)
lat, lon = point
def set_pothole_marker(self, point):
lat, lon = get_lat_lon(point)
if lat is None or lon is None: if lat is None or lon is None:
return return
@@ -110,24 +124,29 @@ class MapViewApp(App):
self.pothole_markers.append(marker) self.pothole_markers.append(marker)
def set_bump_marker(self, point): def set_bump_marker(self, point):
if isinstance(point, dict): lat, lon = get_lat_lon(point)
lat = point.get("lat")
lon = point.get("lon")
else:
lat, lon = point
if lat is None or lon is None: if lat is None or lon is None:
return return
marker = MapMarker( marker = MapMarker(
lat=lat, lat=lat,
lon=lon, lon=lon,
source="images/bump.png" source="images/bump.png"
) )
self.mapview.add_marker(marker) self.mapview.add_marker(marker)
self.bump_markers.append(marker) self.bump_markers.append(marker)
def delete_pothole_marker(self, point):
lat, lon = get_lat_lon(point)
if lat is None or lon is None:
return
pothole = self.map_lat_lon_to_pothole(lat, lon)
if pothole:
self.mapview.remove_marker(pothole)
self.pothole_markers.pop(self.pothole_markers.index(pothole))
def build(self): def build(self):
""" """

View File

@@ -16,8 +16,6 @@ def connect_mqtt(broker, port):
print("Failed to connect {broker}:{port}, return code %d\n", rc) print("Failed to connect {broker}:{port}, return code %d\n", rc)
exit(rc) # Stop execution exit(rc) # Stop execution
logging.info(f"Acting as USER_ID = {config.USER_ID}")
client = mqtt_client.Client() client = mqtt_client.Client()
client.on_connect = on_connect client.on_connect = on_connect
client.connect(broker, port) client.connect(broker, port)
@@ -31,14 +29,13 @@ def publish(client, topic, datasource):
data = datasource.read() data = datasource.read()
msg = AggregatedDataSchema().dumps(data) msg = AggregatedDataSchema().dumps(data)
result = client.publish(topic, msg) result = client.publish(topic, msg)
logging.debug(f"Published to {topic}: {msg[:50]}...") logging.info(f"Published to {topic}: {msg[:50]}...")
status = result[0] status = result[0]
if status != 0: if status != 0:
logging.error(f"Failed to send message to topic {topic}") print(f"Failed to send message to topic {topic}")
def run(): def run():
logging.basicConfig(level = logging.INFO)
# Prepare mqtt client # Prepare mqtt client
client = connect_mqtt(config.MQTT_BROKER_HOST, config.MQTT_BROKER_PORT) client = connect_mqtt(config.MQTT_BROKER_HOST, config.MQTT_BROKER_PORT)
# Prepare datasource # Prepare datasource

View File

@@ -1,5 +1,4 @@
import sys import sys
import os
print("Checking for dead containers...") print("Checking for dead containers...")
@@ -15,9 +14,6 @@ for i in statuses:
if not i[status_index:].startswith("Up "): if not i[status_index:].startswith("Up "):
service_name = i[name_index:] service_name = i[name_index:]
print(f"Crash detected in {service_name}") print(f"Crash detected in {service_name}")
print(f"docker logs for the container:\n")
os.system(f"docker logs {i.split(' ')[0]}")
print()
exit_code = 1 exit_code = 1
sys.exit(exit_code) sys.exit(exit_code)