Compare commits

...

17 Commits

Author SHA1 Message Date
ІМ-24 Владислав Коваленко
92c91c2594 refactor: use try block for error handling and session rollback
All checks were successful
Component testing / Hub testing (push) Successful in 20s
Component testing / Store testing (push) Successful in 35s
Component testing / Integration smoke testing (push) Successful in 2m38s
2026-03-26 13:38:04 +00:00
ІМ-24 Владислав Коваленко
2e623c3a93 fix: remove visibility update from PUT endpoint 2026-03-26 13:21:37 +00:00
ІМ-24 Владислав Коваленко
edd2360d88 feat: update visibility field with websocket 2026-03-26 13:16:13 +00:00
ІМ-24 Владислав Коваленко
8e9dcb50c1 feat: add schema for receiving data from websocker 2026-03-26 10:57:10 +00:00
ІМ-24 Владислав Коваленко
e2c3cda458 fix: remove visible from schemas, so only store knows about it 2026-03-25 17:15:28 +00:00
ІМ-24 Владислав Коваленко
780855c93f fix: add visible columnt to .sql init script 2026-03-25 16:50:19 +00:00
ІМ-24 Владислав Коваленко
86d7b1aaf8 feat: add visibility field to database table 2026-03-25 14:12:09 +00:00
ІМ-24 Владислав Коваленко
d627836e47 feat: add visibility field 2026-03-25 14:00:47 +00:00
0c2392dc0b Merge remote-tracking branch 'github/dev' into dev
All checks were successful
Component testing / Hub testing (push) Successful in 20s
Component testing / Store testing (push) Successful in 22s
Component testing / Integration smoke testing (push) Successful in 2m18s
2026-03-25 13:12:18 +02:00
VladiusVostokus
65f767d38e Merge pull request #29 from Rhinemann/lab4/yushchenko-SCRUM-81-DataProcessing-implementation
[L4] SCRUM-81: implementation data processing function
2026-03-25 11:11:31 +00:00
0695e3d092 [P] Use state machine approach to determine road condition 2026-03-25 12:16:34 +02:00
AndriiJushchenko
d6e094e6c0 fix: delay logic in DP 2026-03-25 12:16:34 +02:00
AndriiJushchenko
2167eb2960 DP implementation + delay 2026-03-25 12:16:34 +02:00
VladiusVostokus
38374a6723 Merge pull request #28 from Rhinemann/lab4/shmuliar-FIX-01-wrong-acceleration-data-types
[P] Fix acceleration data types
2026-03-25 10:14:10 +00:00
c08612f71a [P] Fix acceleration data types 2026-03-25 10:10:29 +02:00
bde51ca5e1 [P] Fix Store -> MapView websocket incompatibility
All checks were successful
Component testing / Hub testing (push) Successful in 21s
Component testing / Store testing (push) Successful in 20s
Component testing / Integration smoke testing (push) Successful in 2m19s
2026-03-24 16:57:29 +02:00
a204bb1676 [P] Split rows into websocket-sendable messages 2026-03-24 16:38:16 +02:00
10 changed files with 75 additions and 34 deletions

View File

@@ -2,6 +2,6 @@ from dataclasses import dataclass
@dataclass @dataclass
class Accelerometer: class Accelerometer:
x: int x: float
y: int y: float
z: int z: float

View File

@@ -32,7 +32,7 @@ class MapViewApp(App):
Встановлює необхідні маркери, викликає функцію для оновлення мапи Встановлює необхідні маркери, викликає функцію для оновлення мапи
""" """
self.update() self.update()
Clock.schedule_interval(self.update, 5) Clock.schedule_interval(self.update, 0.1)
def update(self, *args): def update(self, *args):
""" """

View File

@@ -3,6 +3,6 @@ from dataclasses import dataclass
@dataclass @dataclass
class Accelerometer: class Accelerometer:
x: int x: float
y: int y: float
z: int z: float

View File

@@ -15,6 +15,7 @@ class FileDatasource:
def __init__( def __init__(
self, self,
acc_divisor: float,
accelerometer_filename: str, accelerometer_filename: str,
gps_filename: str, gps_filename: str,
park_filename: str, park_filename: str,
@@ -34,6 +35,8 @@ class FileDatasource:
self._started = False self._started = False
self.acc_divisor = acc_divisor
def startReading(self, *args, **kwargs): def startReading(self, *args, **kwargs):
"""Must be called before read()""" """Must be called before read()"""
if self._started: if self._started:
@@ -160,15 +163,14 @@ class FileDatasource:
return row return row
@staticmethod def _parse_acc(self, row: List[str]) -> Accelerometer:
def _parse_acc(row: List[str]) -> Accelerometer:
if len(row) < 3: if len(row) < 3:
raise ValueError(f"Accelerometer row must have 3 values (x,y,z). Got: {row}") raise ValueError(f"Accelerometer row must have 3 values (x,y,z). Got: {row}")
try: try:
x = int(row[0]) x = int(row[0]) / self.acc_divisor
y = int(row[1]) y = int(row[1]) / self.acc_divisor
z = int(row[2]) z = int(row[2]) / self.acc_divisor
except ValueError as e: except ValueError as e:
raise ValueError(f"Invalid accelerometer values (expected integers): {row}") from e raise ValueError(f"Invalid accelerometer values (expected integers): {row}") from e

View File

@@ -39,7 +39,7 @@ def run():
# Prepare mqtt client # Prepare mqtt client
client = connect_mqtt(config.MQTT_BROKER_HOST, config.MQTT_BROKER_PORT) client = connect_mqtt(config.MQTT_BROKER_HOST, config.MQTT_BROKER_PORT)
# Prepare datasource # Prepare datasource
datasource = FileDatasource("data/accelerometer.csv", "data/gps.csv", "data/parking.csv") datasource = FileDatasource(16384.0, "data/accelerometer.csv", "data/gps.csv", "data/parking.csv")
# Infinity publish data # Infinity publish data
publish(client, config.MQTT_TOPIC, datasource) publish(client, config.MQTT_TOPIC, datasource)

View File

@@ -2,6 +2,6 @@ from marshmallow import Schema, fields
class AccelerometerSchema(Schema): class AccelerometerSchema(Schema):
x = fields.Int() x = fields.Float()
y = fields.Int() y = fields.Float()
z = fields.Int() z = fields.Float()

View File

@@ -1,6 +1,7 @@
from app.entities.agent_data import AgentData from app.entities.agent_data import AgentData
from app.entities.processed_agent_data import ProcessedAgentData from app.entities.processed_agent_data import ProcessedAgentData
_last_detection_state = {}
def process_agent_data( def process_agent_data(
agent_data: AgentData, agent_data: AgentData,
@@ -12,8 +13,24 @@ def process_agent_data(
Returns: Returns:
processed_data_batch (ProcessedAgentData): Processed data containing the classified state of the road surface and agent data. processed_data_batch (ProcessedAgentData): Processed data containing the classified state of the road surface and agent data.
""" """
# Implement it user_id = agent_data.user_id
road_state = "normal"
last_detection_state = _last_detection_state.get(user_id, False)
if (agent_data.accelerometer.z < 0.6):
road_state = "pothole"
elif (agent_data.accelerometer.z > 1.2):
road_state = "bump"
detection_happened = road_state != "normal"
if not (not last_detection_state and detection_happened):
road_state = "normal"
_last_detection_state[user_id] = detection_happened
return ProcessedAgentData( return ProcessedAgentData(
road_state="normal", road_state=road_state,
agent_data=agent_data agent_data=agent_data
) )

View File

@@ -7,5 +7,6 @@ CREATE TABLE processed_agent_data (
z FLOAT, z FLOAT,
latitude FLOAT, latitude FLOAT,
longitude FLOAT, longitude FLOAT,
timestamp TIMESTAMP timestamp TIMESTAMP,
visible BOOLEAN
); );

View File

@@ -8,12 +8,13 @@ from sqlalchemy import (
Integer, Integer,
String, String,
Float, Float,
Boolean,
DateTime, DateTime,
) )
from sqlalchemy.sql import select from sqlalchemy.sql import select
from database import metadata, SessionLocal from database import metadata, SessionLocal
from schemas import ProcessedAgentData, ProcessedAgentDataInDB from schemas import ProcessedAgentData, ProcessedAgentDataInDB, WebSocketData
# FastAPI app setup # FastAPI app setup
app = FastAPI() app = FastAPI()
@@ -30,10 +31,11 @@ processed_agent_data = Table(
Column("latitude", Float), Column("latitude", Float),
Column("longitude", Float), Column("longitude", Float),
Column("timestamp", DateTime), Column("timestamp", DateTime),
Column("visible", Boolean),
) )
# WebSocket subscriptions # WebSocket subscriptions
subscriptions: Dict[int, Set[WebSocket]] = {} subscriptions: Set[WebSocket] = set()
# FastAPI WebSocket endpoint # FastAPI WebSocket endpoint
@@ -41,10 +43,7 @@ subscriptions: Dict[int, Set[WebSocket]] = {}
async def websocket_endpoint(websocket: WebSocket, user_id: int): async def websocket_endpoint(websocket: WebSocket, user_id: int):
await websocket.accept() await websocket.accept()
if user_id not in subscriptions: subscriptions.add(websocket)
subscriptions[user_id] = set()
subscriptions[user_id].add(websocket)
try: try:
# send already available data # send already available data
@@ -55,20 +54,37 @@ async def websocket_endpoint(websocket: WebSocket, user_id: int):
for i in jsonable_data: for i in jsonable_data:
i['timestamp'] = i['timestamp'].strftime("%Y-%m-%dT%H:%M:%SZ") i['timestamp'] = i['timestamp'].strftime("%Y-%m-%dT%H:%M:%SZ")
await websocket.send_json(json.dumps(jsonable_data)) for i in jsonable_data:
await websocket.send_json(json.dumps([i]))
# receive forever # receive forever
while True: while True:
await websocket.receive_text() data = await websocket.receive_text()
try:
if (data):
ws_data = WebSocketData.model_validate(json.loads(data))
session = SessionLocal()
update_query = (
processed_agent_data.update()
.where(processed_agent_data.c.id == ws_data.id)
.values(visible=False)
).returning(processed_agent_data)
res = session.execute(update_query).fetchone()
if (not res):
session.rollback()
raise Exception("Error while websocket PUT")
session.commit()
finally:
session.close()
except WebSocketDisconnect: except WebSocketDisconnect:
subscriptions[user_id].remove(websocket) subscriptions.remove(websocket)
# Function to send data to subscribed users # Function to send data to subscribed users
async def send_data_to_subscribers(user_id: int, data): async def send_data_to_subscribers(data):
if user_id in subscriptions: for websocket in subscriptions:
for websocket in subscriptions[user_id]: await websocket.send_json(json.dumps([data]))
await websocket.send_json(json.dumps(data))
# FastAPI CRUDL endpoints # FastAPI CRUDL endpoints
@@ -84,6 +100,7 @@ def ProcessedAgentData_to_td(data: List[ProcessedAgentData]):
"latitude": item.agent_data.gps.latitude, "latitude": item.agent_data.gps.latitude,
"longitude": item.agent_data.gps.longitude, "longitude": item.agent_data.gps.longitude,
"timestamp": item.agent_data.timestamp, "timestamp": item.agent_data.timestamp,
"visible": True,
} }
for item in data for item in data
] ]
@@ -100,7 +117,7 @@ async def create_processed_agent_data(data: List[ProcessedAgentData], user_id: i
session.commit() session.commit()
for record in created_records: for record in created_records:
await send_data_to_subscribers(user_id, jsonable_encoder(record)) await send_data_to_subscribers(jsonable_encoder(record))
return created_records return created_records
except Exception as err: except Exception as err:
session.rollback() session.rollback()

View File

@@ -49,3 +49,7 @@ class AgentData(BaseModel):
class ProcessedAgentData(BaseModel): class ProcessedAgentData(BaseModel):
road_state: str road_state: str
agent_data: AgentData agent_data: AgentData
class WebSocketData(BaseModel):
id: int