diff --git a/store/main.py b/store/main.py index 272a646..3edd1af 100644 --- a/store/main.py +++ b/store/main.py @@ -2,6 +2,7 @@ import asyncio import json from typing import Set, Dict, List, Any from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect, Body +from fastapi.encoders import jsonable_encoder from sqlalchemy import ( create_engine, MetaData, @@ -126,9 +127,34 @@ async def send_data_to_subscribers(user_id: int, data): @app.post("/processed_agent_data/") async def create_processed_agent_data(data: List[ProcessedAgentData]): - # Insert data to database - # Send data to subscribers - pass + session = SessionLocal() + try: + created_data = [ + { + "road_state": item.road_state, + "x": item.agent_data.accelerometer.x, + "y": item.agent_data.accelerometer.y, + "z": item.agent_data.accelerometer.z, + "latitude": item.agent_data.gps.latitude, + "longitude": item.agent_data.gps.longitude, + "timestamp": item.agent_data.timestamp, + } + for item in data + ] + stmt = processed_agent_data.insert().values(created_data).returning(processed_agent_data) + result = session.execute(stmt) + created_records = [dict(row._mapping) for row in result.fetchall()] + session.commit() + + for record in created_records: + await send_data_to_subscribers(jsonable_encoder(record)) + return created_records + except Exception as err: + session.rollback() + print(f"Database error: {err}") + raise HTTPException(status_code=500, detail="Internal Server Error") + finally: + session.close() @app.get(