Files
IoT-Systems/edge/app/usecases/data_processing.py

37 lines
1.1 KiB
Python
Raw Normal View History

2024-02-12 18:21:08 +02:00
from app.entities.agent_data import AgentData
from app.entities.processed_agent_data import ProcessedAgentData
_last_detection_state = {}
2024-02-12 18:21:08 +02:00
def process_agent_data(
agent_data: AgentData,
) -> ProcessedAgentData:
"""
Process agent data and classify the state of the road surface.
Parameters:
agent_data (AgentData): Agent data that containing accelerometer, GPS, and timestamp.
Returns:
processed_data_batch (ProcessedAgentData): Processed data containing the classified state of the road surface and agent data.
"""
2026-03-25 10:23:25 +02:00
user_id = agent_data.user_id
2026-03-25 10:37:01 +02:00
road_state = "normal"
2026-03-25 10:23:25 +02:00
last_detection_state = _last_detection_state.get(user_id, False)
2026-03-25 10:23:25 +02:00
if (agent_data.accelerometer.z < 0.6):
road_state = "pothole"
elif (agent_data.accelerometer.z > 1.2):
road_state = "bump"
detection_happened = road_state != "normal"
if not (not last_detection_state and detection_happened):
road_state = "normal"
_last_detection_state[user_id] = detection_happened
2026-03-25 10:23:25 +02:00
return ProcessedAgentData(
2026-03-25 10:23:25 +02:00
road_state=road_state,
agent_data=agent_data
)