Compare commits

..

5 Commits

Author SHA1 Message Date
119547d288 [P] Add TRACK_ID selection in MapView
All checks were successful
Component testing / Hub testing (push) Successful in 41s
Component testing / Store testing (push) Successful in 24s
Component testing / Integration smoke testing (push) Successful in 2m30s
2026-03-25 23:27:20 +02:00
b58167f0de [P] Fix wrong row sending order
All checks were successful
Component testing / Hub testing (push) Successful in 35s
Component testing / Store testing (push) Successful in 26s
Component testing / Integration smoke testing (push) Successful in 2m37s
2026-03-25 23:26:39 +02:00
121bd007b3 [P] Add container logs printout on crash
Some checks failed
Component testing / Hub testing (push) Successful in 24s
Component testing / Store testing (push) Successful in 27s
Component testing / Integration smoke testing (push) Has been cancelled
2026-03-25 22:56:12 +02:00
db63eb6d79 [P] Improve logging logic in agent
All checks were successful
Component testing / Hub testing (push) Successful in 24s
Component testing / Store testing (push) Successful in 27s
Component testing / Integration smoke testing (push) Successful in 2m25s
2026-03-25 15:40:07 +02:00
77d6968297 [P] Fix map lat, lon confusion
All checks were successful
Component testing / Hub testing (push) Successful in 23s
Component testing / Store testing (push) Successful in 18s
Component testing / Integration smoke testing (push) Successful in 2m22s
2026-03-25 14:57:38 +02:00
8 changed files with 21 additions and 35 deletions

View File

@@ -2,3 +2,5 @@ import os
STORE_HOST = os.environ.get("STORE_HOST") or "localhost" STORE_HOST = os.environ.get("STORE_HOST") or "localhost"
STORE_PORT = os.environ.get("STORE_PORT") or 8000 STORE_PORT = os.environ.get("STORE_PORT") or 8000
TRACK_ID = int(os.environ.get("TID") or '1')

View File

@@ -72,8 +72,8 @@ class Datasource:
) )
new_points = [ new_points = [
( (
processed_agent_data.latitude,
processed_agent_data.longitude, processed_agent_data.longitude,
processed_agent_data.latitude,
processed_agent_data.road_state, processed_agent_data.road_state,
processed_agent_data.user_id processed_agent_data.user_id
) )

View File

@@ -4,6 +4,7 @@ from kivy_garden.mapview import MapMarker, MapView
from kivy.clock import Clock from kivy.clock import Clock
from lineMapLayer import LineMapLayer from lineMapLayer import LineMapLayer
from datasource import Datasource from datasource import Datasource
import config
line_layer_colors = [ line_layer_colors = [
[1, 0, 0, 1], [1, 0, 0, 1],
@@ -87,6 +88,7 @@ class MapViewApp(App):
self.car_markers[user_id].lat = lat self.car_markers[user_id].lat = lat
self.car_markers[user_id].lon = lon self.car_markers[user_id].lon = lon
if user_id == config.TRACK_ID:
self.mapview.center_on(lat, lon) self.mapview.center_on(lat, lon)
def set_pothole_marker(self, point): def set_pothole_marker(self, point):

View File

@@ -16,6 +16,8 @@ def connect_mqtt(broker, port):
print("Failed to connect {broker}:{port}, return code %d\n", rc) print("Failed to connect {broker}:{port}, return code %d\n", rc)
exit(rc) # Stop execution exit(rc) # Stop execution
logging.info(f"Acting as USER_ID = {config.USER_ID}")
client = mqtt_client.Client() client = mqtt_client.Client()
client.on_connect = on_connect client.on_connect = on_connect
client.connect(broker, port) client.connect(broker, port)
@@ -29,13 +31,14 @@ def publish(client, topic, datasource):
data = datasource.read() data = datasource.read()
msg = AggregatedDataSchema().dumps(data) msg = AggregatedDataSchema().dumps(data)
result = client.publish(topic, msg) result = client.publish(topic, msg)
logging.info(f"Published to {topic}: {msg[:50]}...") logging.debug(f"Published to {topic}: {msg[:50]}...")
status = result[0] status = result[0]
if status != 0: if status != 0:
print(f"Failed to send message to topic {topic}") logging.error(f"Failed to send message to topic {topic}")
def run(): def run():
logging.basicConfig(level = logging.INFO)
# Prepare mqtt client # Prepare mqtt client
client = connect_mqtt(config.MQTT_BROKER_HOST, config.MQTT_BROKER_PORT) client = connect_mqtt(config.MQTT_BROKER_HOST, config.MQTT_BROKER_PORT)
# Prepare datasource # Prepare datasource

View File

@@ -7,6 +7,5 @@ CREATE TABLE processed_agent_data (
z FLOAT, z FLOAT,
latitude FLOAT, latitude FLOAT,
longitude FLOAT, longitude FLOAT,
timestamp TIMESTAMP, timestamp TIMESTAMP
visible BOOLEAN
); );

View File

@@ -8,13 +8,12 @@ from sqlalchemy import (
Integer, Integer,
String, String,
Float, Float,
Boolean,
DateTime, DateTime,
) )
from sqlalchemy.sql import select from sqlalchemy.sql import select
from database import metadata, SessionLocal from database import metadata, SessionLocal
from schemas import ProcessedAgentData, ProcessedAgentDataInDB, WebSocketData from schemas import ProcessedAgentData, ProcessedAgentDataInDB
# FastAPI app setup # FastAPI app setup
app = FastAPI() app = FastAPI()
@@ -31,7 +30,6 @@ processed_agent_data = Table(
Column("latitude", Float), Column("latitude", Float),
Column("longitude", Float), Column("longitude", Float),
Column("timestamp", DateTime), Column("timestamp", DateTime),
Column("visible", Boolean),
) )
# WebSocket subscriptions # WebSocket subscriptions
@@ -47,7 +45,7 @@ async def websocket_endpoint(websocket: WebSocket, user_id: int):
try: try:
# send already available data # send already available data
r = processed_agent_data.select() r = processed_agent_data.select().order_by(processed_agent_data.c.timestamp)
stored_data = SessionLocal().execute(r).fetchall() stored_data = SessionLocal().execute(r).fetchall()
jsonable_data = [{c.name: getattr(i, c.name) for c in processed_agent_data.columns} for i in stored_data] jsonable_data = [{c.name: getattr(i, c.name) for c in processed_agent_data.columns} for i in stored_data]
@@ -59,24 +57,7 @@ async def websocket_endpoint(websocket: WebSocket, user_id: int):
# receive forever # receive forever
while True: while True:
data = await websocket.receive_text() await websocket.receive_text()
try:
if (data):
ws_data = WebSocketData.model_validate(json.loads(data))
session = SessionLocal()
update_query = (
processed_agent_data.update()
.where(processed_agent_data.c.id == ws_data.id)
.values(visible=False)
).returning(processed_agent_data)
res = session.execute(update_query).fetchone()
if (not res):
session.rollback()
raise Exception("Error while websocket PUT")
session.commit()
finally:
session.close()
except WebSocketDisconnect: except WebSocketDisconnect:
subscriptions.remove(websocket) subscriptions.remove(websocket)
@@ -100,7 +81,6 @@ def ProcessedAgentData_to_td(data: List[ProcessedAgentData]):
"latitude": item.agent_data.gps.latitude, "latitude": item.agent_data.gps.latitude,
"longitude": item.agent_data.gps.longitude, "longitude": item.agent_data.gps.longitude,
"timestamp": item.agent_data.timestamp, "timestamp": item.agent_data.timestamp,
"visible": True,
} }
for item in data for item in data
] ]
@@ -116,7 +96,7 @@ async def create_processed_agent_data(data: List[ProcessedAgentData], user_id: i
created_records = [dict(row._mapping) for row in result.fetchall()] created_records = [dict(row._mapping) for row in result.fetchall()]
session.commit() session.commit()
for record in created_records: for record in sorted(created_records, key = lambda x: x['timestamp']):
await send_data_to_subscribers(jsonable_encoder(record)) await send_data_to_subscribers(jsonable_encoder(record))
return created_records return created_records
except Exception as err: except Exception as err:

View File

@@ -49,7 +49,3 @@ class AgentData(BaseModel):
class ProcessedAgentData(BaseModel): class ProcessedAgentData(BaseModel):
road_state: str road_state: str
agent_data: AgentData agent_data: AgentData
class WebSocketData(BaseModel):
id: int

View File

@@ -1,4 +1,5 @@
import sys import sys
import os
print("Checking for dead containers...") print("Checking for dead containers...")
@@ -14,6 +15,9 @@ for i in statuses:
if not i[status_index:].startswith("Up "): if not i[status_index:].startswith("Up "):
service_name = i[name_index:] service_name = i[name_index:]
print(f"Crash detected in {service_name}") print(f"Crash detected in {service_name}")
print(f"docker logs for the container:\n")
os.system(f"docker logs {i.split(' ')[0]}")
print()
exit_code = 1 exit_code = 1
sys.exit(exit_code) sys.exit(exit_code)