Compare commits

...

17 Commits

Author SHA1 Message Date
ІМ-24 Владислав Коваленко
92c91c2594 refactor: use try block for error handling and session rollback
All checks were successful
Component testing / Hub testing (push) Successful in 20s
Component testing / Store testing (push) Successful in 35s
Component testing / Integration smoke testing (push) Successful in 2m38s
2026-03-26 13:38:04 +00:00
ІМ-24 Владислав Коваленко
2e623c3a93 fix: remove visibility update from PUT endpoint 2026-03-26 13:21:37 +00:00
ІМ-24 Владислав Коваленко
edd2360d88 feat: update visibility field with websocket 2026-03-26 13:16:13 +00:00
ІМ-24 Владислав Коваленко
8e9dcb50c1 feat: add schema for receiving data from websocker 2026-03-26 10:57:10 +00:00
ІМ-24 Владислав Коваленко
e2c3cda458 fix: remove visible from schemas, so only store knows about it 2026-03-25 17:15:28 +00:00
ІМ-24 Владислав Коваленко
780855c93f fix: add visible columnt to .sql init script 2026-03-25 16:50:19 +00:00
ІМ-24 Владислав Коваленко
86d7b1aaf8 feat: add visibility field to database table 2026-03-25 14:12:09 +00:00
ІМ-24 Владислав Коваленко
d627836e47 feat: add visibility field 2026-03-25 14:00:47 +00:00
0c2392dc0b Merge remote-tracking branch 'github/dev' into dev
All checks were successful
Component testing / Hub testing (push) Successful in 20s
Component testing / Store testing (push) Successful in 22s
Component testing / Integration smoke testing (push) Successful in 2m18s
2026-03-25 13:12:18 +02:00
VladiusVostokus
65f767d38e Merge pull request #29 from Rhinemann/lab4/yushchenko-SCRUM-81-DataProcessing-implementation
[L4] SCRUM-81: implementation data processing function
2026-03-25 11:11:31 +00:00
0695e3d092 [P] Use state machine approach to determine road condition 2026-03-25 12:16:34 +02:00
AndriiJushchenko
d6e094e6c0 fix: delay logic in DP 2026-03-25 12:16:34 +02:00
AndriiJushchenko
2167eb2960 DP implementation + delay 2026-03-25 12:16:34 +02:00
VladiusVostokus
38374a6723 Merge pull request #28 from Rhinemann/lab4/shmuliar-FIX-01-wrong-acceleration-data-types
[P] Fix acceleration data types
2026-03-25 10:14:10 +00:00
c08612f71a [P] Fix acceleration data types 2026-03-25 10:10:29 +02:00
bde51ca5e1 [P] Fix Store -> MapView websocket incompatibility
All checks were successful
Component testing / Hub testing (push) Successful in 21s
Component testing / Store testing (push) Successful in 20s
Component testing / Integration smoke testing (push) Successful in 2m19s
2026-03-24 16:57:29 +02:00
a204bb1676 [P] Split rows into websocket-sendable messages 2026-03-24 16:38:16 +02:00
10 changed files with 75 additions and 34 deletions

View File

@@ -2,6 +2,6 @@ from dataclasses import dataclass
@dataclass
class Accelerometer:
x: int
y: int
z: int
x: float
y: float
z: float

View File

@@ -32,7 +32,7 @@ class MapViewApp(App):
Встановлює необхідні маркери, викликає функцію для оновлення мапи
"""
self.update()
Clock.schedule_interval(self.update, 5)
Clock.schedule_interval(self.update, 0.1)
def update(self, *args):
"""

View File

@@ -3,6 +3,6 @@ from dataclasses import dataclass
@dataclass
class Accelerometer:
x: int
y: int
z: int
x: float
y: float
z: float

View File

@@ -15,6 +15,7 @@ class FileDatasource:
def __init__(
self,
acc_divisor: float,
accelerometer_filename: str,
gps_filename: str,
park_filename: str,
@@ -34,6 +35,8 @@ class FileDatasource:
self._started = False
self.acc_divisor = acc_divisor
def startReading(self, *args, **kwargs):
"""Must be called before read()"""
if self._started:
@@ -160,15 +163,14 @@ class FileDatasource:
return row
@staticmethod
def _parse_acc(row: List[str]) -> Accelerometer:
def _parse_acc(self, row: List[str]) -> Accelerometer:
if len(row) < 3:
raise ValueError(f"Accelerometer row must have 3 values (x,y,z). Got: {row}")
try:
x = int(row[0])
y = int(row[1])
z = int(row[2])
x = int(row[0]) / self.acc_divisor
y = int(row[1]) / self.acc_divisor
z = int(row[2]) / self.acc_divisor
except ValueError as e:
raise ValueError(f"Invalid accelerometer values (expected integers): {row}") from e

View File

@@ -39,7 +39,7 @@ def run():
# Prepare mqtt client
client = connect_mqtt(config.MQTT_BROKER_HOST, config.MQTT_BROKER_PORT)
# Prepare datasource
datasource = FileDatasource("data/accelerometer.csv", "data/gps.csv", "data/parking.csv")
datasource = FileDatasource(16384.0, "data/accelerometer.csv", "data/gps.csv", "data/parking.csv")
# Infinity publish data
publish(client, config.MQTT_TOPIC, datasource)

View File

@@ -2,6 +2,6 @@ from marshmallow import Schema, fields
class AccelerometerSchema(Schema):
x = fields.Int()
y = fields.Int()
z = fields.Int()
x = fields.Float()
y = fields.Float()
z = fields.Float()

View File

@@ -1,6 +1,7 @@
from app.entities.agent_data import AgentData
from app.entities.processed_agent_data import ProcessedAgentData
_last_detection_state = {}
def process_agent_data(
agent_data: AgentData,
@@ -12,8 +13,24 @@ def process_agent_data(
Returns:
processed_data_batch (ProcessedAgentData): Processed data containing the classified state of the road surface and agent data.
"""
# Implement it
user_id = agent_data.user_id
road_state = "normal"
last_detection_state = _last_detection_state.get(user_id, False)
if (agent_data.accelerometer.z < 0.6):
road_state = "pothole"
elif (agent_data.accelerometer.z > 1.2):
road_state = "bump"
detection_happened = road_state != "normal"
if not (not last_detection_state and detection_happened):
road_state = "normal"
_last_detection_state[user_id] = detection_happened
return ProcessedAgentData(
road_state="normal",
road_state=road_state,
agent_data=agent_data
)

View File

@@ -7,5 +7,6 @@ CREATE TABLE processed_agent_data (
z FLOAT,
latitude FLOAT,
longitude FLOAT,
timestamp TIMESTAMP
timestamp TIMESTAMP,
visible BOOLEAN
);

View File

@@ -8,12 +8,13 @@ from sqlalchemy import (
Integer,
String,
Float,
Boolean,
DateTime,
)
from sqlalchemy.sql import select
from database import metadata, SessionLocal
from schemas import ProcessedAgentData, ProcessedAgentDataInDB
from schemas import ProcessedAgentData, ProcessedAgentDataInDB, WebSocketData
# FastAPI app setup
app = FastAPI()
@@ -30,10 +31,11 @@ processed_agent_data = Table(
Column("latitude", Float),
Column("longitude", Float),
Column("timestamp", DateTime),
Column("visible", Boolean),
)
# WebSocket subscriptions
subscriptions: Dict[int, Set[WebSocket]] = {}
subscriptions: Set[WebSocket] = set()
# FastAPI WebSocket endpoint
@@ -41,10 +43,7 @@ subscriptions: Dict[int, Set[WebSocket]] = {}
async def websocket_endpoint(websocket: WebSocket, user_id: int):
await websocket.accept()
if user_id not in subscriptions:
subscriptions[user_id] = set()
subscriptions[user_id].add(websocket)
subscriptions.add(websocket)
try:
# send already available data
@@ -55,20 +54,37 @@ async def websocket_endpoint(websocket: WebSocket, user_id: int):
for i in jsonable_data:
i['timestamp'] = i['timestamp'].strftime("%Y-%m-%dT%H:%M:%SZ")
await websocket.send_json(json.dumps(jsonable_data))
for i in jsonable_data:
await websocket.send_json(json.dumps([i]))
# receive forever
while True:
await websocket.receive_text()
data = await websocket.receive_text()
try:
if (data):
ws_data = WebSocketData.model_validate(json.loads(data))
session = SessionLocal()
update_query = (
processed_agent_data.update()
.where(processed_agent_data.c.id == ws_data.id)
.values(visible=False)
).returning(processed_agent_data)
res = session.execute(update_query).fetchone()
if (not res):
session.rollback()
raise Exception("Error while websocket PUT")
session.commit()
finally:
session.close()
except WebSocketDisconnect:
subscriptions[user_id].remove(websocket)
subscriptions.remove(websocket)
# Function to send data to subscribed users
async def send_data_to_subscribers(user_id: int, data):
if user_id in subscriptions:
for websocket in subscriptions[user_id]:
await websocket.send_json(json.dumps(data))
async def send_data_to_subscribers(data):
for websocket in subscriptions:
await websocket.send_json(json.dumps([data]))
# FastAPI CRUDL endpoints
@@ -84,6 +100,7 @@ def ProcessedAgentData_to_td(data: List[ProcessedAgentData]):
"latitude": item.agent_data.gps.latitude,
"longitude": item.agent_data.gps.longitude,
"timestamp": item.agent_data.timestamp,
"visible": True,
}
for item in data
]
@@ -100,7 +117,7 @@ async def create_processed_agent_data(data: List[ProcessedAgentData], user_id: i
session.commit()
for record in created_records:
await send_data_to_subscribers(user_id, jsonable_encoder(record))
await send_data_to_subscribers(jsonable_encoder(record))
return created_records
except Exception as err:
session.rollback()

View File

@@ -49,3 +49,7 @@ class AgentData(BaseModel):
class ProcessedAgentData(BaseModel):
road_state: str
agent_data: AgentData
class WebSocketData(BaseModel):
id: int