Compare commits

...

15 Commits

Author SHA1 Message Date
hasslesstech bc0977588e [P] Add second agent in docker-compose.yaml
Component testing / Hub testing (push) Successful in 21s
Component testing / Store testing (push) Successful in 22s
Component testing / Integration smoke testing (push) Has been cancelled
2026-03-25 22:34:50 +02:00
hasslesstech 3d2b02c0c6 [P] Add GPS file selection to agent 2026-03-25 22:34:50 +02:00
hasslesstech e3b27bbd4c [P] Add TRACK_ID selection in MapView
Component testing / Hub testing (push) Successful in 27s
Component testing / Store testing (push) Successful in 24s
Component testing / Integration smoke testing (push) Failing after 14s
2026-03-25 22:34:15 +02:00
hasslesstech 987e968dd4 [P] Fix wrong row sending order
Component testing / Hub testing (push) Successful in 27s
Component testing / Store testing (push) Successful in 29s
Component testing / Integration smoke testing (push) Successful in 2m33s
2026-03-25 22:27:40 +02:00
hasslesstech db63eb6d79 [P] Improve logging logic in agent
Component testing / Hub testing (push) Successful in 24s
Component testing / Store testing (push) Successful in 27s
Component testing / Integration smoke testing (push) Successful in 2m25s
2026-03-25 15:40:07 +02:00
hasslesstech 77d6968297 [P] Fix map lat, lon confusion
Component testing / Hub testing (push) Successful in 23s
Component testing / Store testing (push) Successful in 18s
Component testing / Integration smoke testing (push) Successful in 2m22s
2026-03-25 14:57:38 +02:00
hasslesstech 0c2392dc0b Merge remote-tracking branch 'github/dev' into dev
Component testing / Hub testing (push) Successful in 20s
Component testing / Store testing (push) Successful in 22s
Component testing / Integration smoke testing (push) Successful in 2m18s
2026-03-25 13:12:18 +02:00
VladiusVostokus 65f767d38e Merge pull request #29 from Rhinemann/lab4/yushchenko-SCRUM-81-DataProcessing-implementation
[L4] SCRUM-81: implementation data processing function
2026-03-25 11:11:31 +00:00
hasslesstech 0695e3d092 [P] Use state machine approach to determine road condition 2026-03-25 12:16:34 +02:00
AndriiJushchenko d6e094e6c0 fix: delay logic in DP 2026-03-25 12:16:34 +02:00
AndriiJushchenko 2167eb2960 DP implementation + delay 2026-03-25 12:16:34 +02:00
VladiusVostokus 38374a6723 Merge pull request #28 from Rhinemann/lab4/shmuliar-FIX-01-wrong-acceleration-data-types
[P] Fix acceleration data types
2026-03-25 10:14:10 +00:00
hasslesstech c08612f71a [P] Fix acceleration data types 2026-03-25 10:10:29 +02:00
hasslesstech bde51ca5e1 [P] Fix Store -> MapView websocket incompatibility
Component testing / Hub testing (push) Successful in 21s
Component testing / Store testing (push) Successful in 20s
Component testing / Integration smoke testing (push) Successful in 2m19s
2026-03-24 16:57:29 +02:00
hasslesstech a204bb1676 [P] Split rows into websocket-sendable messages 2026-03-24 16:38:16 +02:00
12 changed files with 81 additions and 42 deletions
+2
View File
@@ -2,3 +2,5 @@ import os
STORE_HOST = os.environ.get("STORE_HOST") or "localhost" STORE_HOST = os.environ.get("STORE_HOST") or "localhost"
STORE_PORT = os.environ.get("STORE_PORT") or 8000 STORE_PORT = os.environ.get("STORE_PORT") or 8000
TRACK_ID = int(os.environ.get("TID") or '1')
+1 -1
View File
@@ -72,8 +72,8 @@ class Datasource:
) )
new_points = [ new_points = [
( (
processed_agent_data.latitude,
processed_agent_data.longitude, processed_agent_data.longitude,
processed_agent_data.latitude,
processed_agent_data.road_state, processed_agent_data.road_state,
processed_agent_data.user_id processed_agent_data.user_id
) )
+3 -3
View File
@@ -2,6 +2,6 @@ from dataclasses import dataclass
@dataclass @dataclass
class Accelerometer: class Accelerometer:
x: int x: float
y: int y: float
z: int z: float
+3 -1
View File
@@ -4,6 +4,7 @@ from kivy_garden.mapview import MapMarker, MapView
from kivy.clock import Clock from kivy.clock import Clock
from lineMapLayer import LineMapLayer from lineMapLayer import LineMapLayer
from datasource import Datasource from datasource import Datasource
import config
line_layer_colors = [ line_layer_colors = [
[1, 0, 0, 1], [1, 0, 0, 1],
@@ -32,7 +33,7 @@ class MapViewApp(App):
Встановлює необхідні маркери, викликає функцію для оновлення мапи Встановлює необхідні маркери, викликає функцію для оновлення мапи
""" """
self.update() self.update()
Clock.schedule_interval(self.update, 5) Clock.schedule_interval(self.update, 0.1)
def update(self, *args): def update(self, *args):
""" """
@@ -87,6 +88,7 @@ class MapViewApp(App):
self.car_markers[user_id].lat = lat self.car_markers[user_id].lat = lat
self.car_markers[user_id].lon = lon self.car_markers[user_id].lon = lon
if user_id == config.TRACK_ID:
self.mapview.center_on(lat, lon) self.mapview.center_on(lat, lon)
def set_pothole_marker(self, point): def set_pothole_marker(self, point):
+3 -4
View File
@@ -1,18 +1,17 @@
import os import os
def try_parse(type, value: str): def try_parse(type, value: str):
try: try:
return type(value) return type(value)
except Exception: except Exception:
return None return None
USER_ID = try_parse(int, os.environ.get("USER_ID")) or 1
# MQTT config # MQTT config
MQTT_BROKER_HOST = os.environ.get("MQTT_BROKER_HOST") or "mqtt" MQTT_BROKER_HOST = os.environ.get("MQTT_BROKER_HOST") or "mqtt"
MQTT_BROKER_PORT = try_parse(int, os.environ.get("MQTT_BROKER_PORT")) or 1883 MQTT_BROKER_PORT = try_parse(int, os.environ.get("MQTT_BROKER_PORT")) or 1883
MQTT_TOPIC = os.environ.get("MQTT_TOPIC") or "agent" MQTT_TOPIC = os.environ.get("MQTT_TOPIC") or "agent"
# Delay for sending data to mqtt in seconds # Data-related config
USER_ID = try_parse(int, os.environ.get("USER_ID")) or 1
DELAY = try_parse(float, os.environ.get("DELAY")) or 1 DELAY = try_parse(float, os.environ.get("DELAY")) or 1
GPS_SOURCE = os.environ.get("GPS_SOURCE") or "data/gps.csv"
+3 -3
View File
@@ -3,6 +3,6 @@ from dataclasses import dataclass
@dataclass @dataclass
class Accelerometer: class Accelerometer:
x: int x: float
y: int y: float
z: int z: float
+7 -5
View File
@@ -15,6 +15,7 @@ class FileDatasource:
def __init__( def __init__(
self, self,
acc_divisor: float,
accelerometer_filename: str, accelerometer_filename: str,
gps_filename: str, gps_filename: str,
park_filename: str, park_filename: str,
@@ -34,6 +35,8 @@ class FileDatasource:
self._started = False self._started = False
self.acc_divisor = acc_divisor
def startReading(self, *args, **kwargs): def startReading(self, *args, **kwargs):
"""Must be called before read()""" """Must be called before read()"""
if self._started: if self._started:
@@ -160,15 +163,14 @@ class FileDatasource:
return row return row
@staticmethod def _parse_acc(self, row: List[str]) -> Accelerometer:
def _parse_acc(row: List[str]) -> Accelerometer:
if len(row) < 3: if len(row) < 3:
raise ValueError(f"Accelerometer row must have 3 values (x,y,z). Got: {row}") raise ValueError(f"Accelerometer row must have 3 values (x,y,z). Got: {row}")
try: try:
x = int(row[0]) x = int(row[0]) / self.acc_divisor
y = int(row[1]) y = int(row[1]) / self.acc_divisor
z = int(row[2]) z = int(row[2]) / self.acc_divisor
except ValueError as e: except ValueError as e:
raise ValueError(f"Invalid accelerometer values (expected integers): {row}") from e raise ValueError(f"Invalid accelerometer values (expected integers): {row}") from e
+6 -3
View File
@@ -16,6 +16,8 @@ def connect_mqtt(broker, port):
print("Failed to connect {broker}:{port}, return code %d\n", rc) print("Failed to connect {broker}:{port}, return code %d\n", rc)
exit(rc) # Stop execution exit(rc) # Stop execution
logging.info(f"Acting as USER_ID = {config.USER_ID}")
client = mqtt_client.Client() client = mqtt_client.Client()
client.on_connect = on_connect client.on_connect = on_connect
client.connect(broker, port) client.connect(broker, port)
@@ -29,17 +31,18 @@ def publish(client, topic, datasource):
data = datasource.read() data = datasource.read()
msg = AggregatedDataSchema().dumps(data) msg = AggregatedDataSchema().dumps(data)
result = client.publish(topic, msg) result = client.publish(topic, msg)
logging.info(f"Published to {topic}: {msg[:50]}...") logging.debug(f"Published to {topic}: {msg[:50]}...")
status = result[0] status = result[0]
if status != 0: if status != 0:
print(f"Failed to send message to topic {topic}") logging.error(f"Failed to send message to topic {topic}")
def run(): def run():
logging.basicConfig(level = logging.INFO)
# Prepare mqtt client # Prepare mqtt client
client = connect_mqtt(config.MQTT_BROKER_HOST, config.MQTT_BROKER_PORT) client = connect_mqtt(config.MQTT_BROKER_HOST, config.MQTT_BROKER_PORT)
# Prepare datasource # Prepare datasource
datasource = FileDatasource("data/accelerometer.csv", "data/gps.csv", "data/parking.csv") datasource = FileDatasource(16384.0, "data/accelerometer.csv", config.GPS_SOURCE, "data/parking.csv")
# Infinity publish data # Infinity publish data
publish(client, config.MQTT_TOPIC, datasource) publish(client, config.MQTT_TOPIC, datasource)
+3 -3
View File
@@ -2,6 +2,6 @@ from marshmallow import Schema, fields
class AccelerometerSchema(Schema): class AccelerometerSchema(Schema):
x = fields.Int() x = fields.Float()
y = fields.Int() y = fields.Float()
z = fields.Int() z = fields.Float()
+20 -3
View File
@@ -14,8 +14,7 @@ services:
mqtt_network: mqtt_network:
fake_agent: agent1:
container_name: agent
build: build:
context: . context: .
dockerfile: agent/Dockerfile dockerfile: agent/Dockerfile
@@ -26,7 +25,25 @@ services:
MQTT_BROKER_HOST: "mqtt" MQTT_BROKER_HOST: "mqtt"
MQTT_BROKER_PORT: 1883 MQTT_BROKER_PORT: 1883
MQTT_TOPIC: "agent_data_topic" MQTT_TOPIC: "agent_data_topic"
DELAY: 0.1 DELAY: 1.2
USER_ID: 2
networks:
mqtt_network:
agent2:
build:
context: .
dockerfile: agent/Dockerfile
depends_on:
- mqtt
environment:
PYTHONUNBUFFERED: 1
MQTT_BROKER_HOST: "mqtt"
MQTT_BROKER_PORT: 1883
MQTT_TOPIC: "agent_data_topic"
GPS_SOURCE: "data/route2.csv"
DELAY: 1.0
USER_ID: 3
networks: networks:
mqtt_network: mqtt_network:
+19 -2
View File
@@ -1,6 +1,7 @@
from app.entities.agent_data import AgentData from app.entities.agent_data import AgentData
from app.entities.processed_agent_data import ProcessedAgentData from app.entities.processed_agent_data import ProcessedAgentData
_last_detection_state = {}
def process_agent_data( def process_agent_data(
agent_data: AgentData, agent_data: AgentData,
@@ -12,8 +13,24 @@ def process_agent_data(
Returns: Returns:
processed_data_batch (ProcessedAgentData): Processed data containing the classified state of the road surface and agent data. processed_data_batch (ProcessedAgentData): Processed data containing the classified state of the road surface and agent data.
""" """
# Implement it user_id = agent_data.user_id
road_state = "normal"
last_detection_state = _last_detection_state.get(user_id, False)
if (agent_data.accelerometer.z < 0.6):
road_state = "pothole"
elif (agent_data.accelerometer.z > 1.2):
road_state = "bump"
detection_happened = road_state != "normal"
if not (not last_detection_state and detection_happened):
road_state = "normal"
_last_detection_state[user_id] = detection_happened
return ProcessedAgentData( return ProcessedAgentData(
road_state="normal", road_state=road_state,
agent_data=agent_data agent_data=agent_data
) )
+10 -13
View File
@@ -33,7 +33,7 @@ processed_agent_data = Table(
) )
# WebSocket subscriptions # WebSocket subscriptions
subscriptions: Dict[int, Set[WebSocket]] = {} subscriptions: Set[WebSocket] = set()
# FastAPI WebSocket endpoint # FastAPI WebSocket endpoint
@@ -41,10 +41,7 @@ subscriptions: Dict[int, Set[WebSocket]] = {}
async def websocket_endpoint(websocket: WebSocket, user_id: int): async def websocket_endpoint(websocket: WebSocket, user_id: int):
await websocket.accept() await websocket.accept()
if user_id not in subscriptions: subscriptions.add(websocket)
subscriptions[user_id] = set()
subscriptions[user_id].add(websocket)
try: try:
# send already available data # send already available data
@@ -55,20 +52,20 @@ async def websocket_endpoint(websocket: WebSocket, user_id: int):
for i in jsonable_data: for i in jsonable_data:
i['timestamp'] = i['timestamp'].strftime("%Y-%m-%dT%H:%M:%SZ") i['timestamp'] = i['timestamp'].strftime("%Y-%m-%dT%H:%M:%SZ")
await websocket.send_json(json.dumps(jsonable_data)) for i in jsonable_data:
await websocket.send_json(json.dumps([i]))
# receive forever # receive forever
while True: while True:
await websocket.receive_text() await websocket.receive_text()
except WebSocketDisconnect: except WebSocketDisconnect:
subscriptions[user_id].remove(websocket) subscriptions.remove(websocket)
# Function to send data to subscribed users # Function to send data to subscribed users
async def send_data_to_subscribers(user_id: int, data): async def send_data_to_subscribers(data):
if user_id in subscriptions: for websocket in subscriptions:
for websocket in subscriptions[user_id]: await websocket.send_json(json.dumps([data]))
await websocket.send_json(json.dumps(data))
# FastAPI CRUDL endpoints # FastAPI CRUDL endpoints
@@ -99,8 +96,8 @@ async def create_processed_agent_data(data: List[ProcessedAgentData], user_id: i
created_records = [dict(row._mapping) for row in result.fetchall()] created_records = [dict(row._mapping) for row in result.fetchall()]
session.commit() session.commit()
for record in created_records: for record in sorted(created_records, key = lambda x: x['timestamp']):
await send_data_to_subscribers(user_id, jsonable_encoder(record)) await send_data_to_subscribers(jsonable_encoder(record))
return created_records return created_records
except Exception as err: except Exception as err:
session.rollback() session.rollback()