diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..26d3352 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,3 @@ +# Default ignored files +/shelf/ +/workspace.xml diff --git a/.idea/IoT-Systems.iml b/.idea/IoT-Systems.iml new file mode 100644 index 0000000..688aa87 --- /dev/null +++ b/.idea/IoT-Systems.iml @@ -0,0 +1,15 @@ + + + + + + + + + + + + + \ No newline at end of file diff --git a/.idea/dbnavigator.xml b/.idea/dbnavigator.xml new file mode 100644 index 0000000..3b61c61 --- /dev/null +++ b/.idea/dbnavigator.xml @@ -0,0 +1,425 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml new file mode 100644 index 0000000..977601b --- /dev/null +++ b/.idea/inspectionProfiles/Project_Default.xml @@ -0,0 +1,16 @@ + + + + \ No newline at end of file diff --git a/.idea/inspectionProfiles/profiles_settings.xml b/.idea/inspectionProfiles/profiles_settings.xml new file mode 100644 index 0000000..105ce2d --- /dev/null +++ b/.idea/inspectionProfiles/profiles_settings.xml @@ -0,0 +1,6 @@ + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..df29e35 --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,7 @@ + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..5aef1d1 --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..94a25f7 --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/agent/docker/docker-compose.yaml b/agent/docker/docker-compose.yaml index 3c8b1c4..3059c9e 100644 --- a/agent/docker/docker-compose.yaml +++ b/agent/docker/docker-compose.yaml @@ -1,4 +1,3 @@ -version: "3.9" name: "road_vision" services: mqtt: diff --git a/store/.gitignore b/store/.gitignore index 75b0912..f810ad9 100644 --- a/store/.gitignore +++ b/store/.gitignore @@ -1,3 +1,5 @@ venv __pycache__ -.idea \ No newline at end of file +.idea + +.idea/ diff --git a/store/__init__.py b/store/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/store/database.py b/store/database.py new file mode 100644 index 0000000..7d86055 --- /dev/null +++ b/store/database.py @@ -0,0 +1,15 @@ +from sqlalchemy import MetaData +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker, declarative_base + +from config import POSTGRES_USER, POSTGRES_PASSWORD, POSTGRES_HOST, POSTGRES_PORT, POSTGRES_DB + + +DATABASE_URL = f"postgresql+psycopg2://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{POSTGRES_HOST}:{POSTGRES_PORT}/{POSTGRES_DB}" +engine = create_engine(DATABASE_URL) + +Base = declarative_base() + +metadata = MetaData() + +SessionLocal = sessionmaker(bind=engine) diff --git a/store/docker/docker-compose.yaml b/store/docker/docker-compose.yaml index 8f9c32a..dff34e5 100644 --- a/store/docker/docker-compose.yaml +++ b/store/docker/docker-compose.yaml @@ -1,8 +1,7 @@ -version: "3.9" name: "road_vision__database" services: postgres_db: - image: postgres:latest + image: postgres:17 container_name: postgres_db restart: always environment: diff --git a/store/main.py b/store/main.py index 272a646..a5bd8e4 100644 --- a/store/main.py +++ b/store/main.py @@ -1,10 +1,8 @@ -import asyncio import json -from typing import Set, Dict, List, Any +from typing import Set, Dict, List from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect, Body +from fastapi.encoders import jsonable_encoder from sqlalchemy import ( - create_engine, - MetaData, Table, Column, Integer, @@ -12,25 +10,14 @@ from sqlalchemy import ( Float, DateTime, ) -from sqlalchemy.orm import sessionmaker from sqlalchemy.sql import select -from datetime import datetime -from pydantic import BaseModel, field_validator -from config import ( - POSTGRES_HOST, - POSTGRES_PORT, - POSTGRES_DB, - POSTGRES_USER, - POSTGRES_PASSWORD, -) + +from database import metadata, SessionLocal +from schemas import ProcessedAgentData, ProcessedAgentDataInDB # FastAPI app setup app = FastAPI() -# SQLAlchemy setup -DATABASE_URL = f"postgresql+psycopg2://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{POSTGRES_HOST}:{POSTGRES_PORT}/{POSTGRES_DB}" -engine = create_engine(DATABASE_URL) -metadata = MetaData() -# Define the ProcessedAgentData table + processed_agent_data = Table( "processed_agent_data", metadata, @@ -44,57 +31,6 @@ processed_agent_data = Table( Column("longitude", Float), Column("timestamp", DateTime), ) -SessionLocal = sessionmaker(bind=engine) - - -# SQLAlchemy model -class ProcessedAgentDataInDB(BaseModel): - id: int - road_state: str - user_id: int - x: float - y: float - z: float - latitude: float - longitude: float - timestamp: datetime - - -# FastAPI models -class AccelerometerData(BaseModel): - x: float - y: float - z: float - - -class GpsData(BaseModel): - latitude: float - longitude: float - - -class AgentData(BaseModel): - user_id: int - accelerometer: AccelerometerData - gps: GpsData - timestamp: datetime - - @classmethod - @field_validator("timestamp", mode="before") - def check_timestamp(cls, value): - if isinstance(value, datetime): - return value - try: - return datetime.fromisoformat(value) - except (TypeError, ValueError): - raise ValueError( - "Invalid timestamp format. Expected ISO 8601 format (YYYY-MM-DDTHH:MM:SSZ)." - ) - - -class ProcessedAgentData(BaseModel): - road_state: str - agent_data: AgentData - # WebSocket subscriptions subscriptions: Dict[int, Set[WebSocket]] = {} @@ -125,10 +61,36 @@ async def send_data_to_subscribers(user_id: int, data): @app.post("/processed_agent_data/") -async def create_processed_agent_data(data: List[ProcessedAgentData]): - # Insert data to database - # Send data to subscribers - pass +async def create_processed_agent_data(data: List[ProcessedAgentData], user_id: int = Body(..., embed=True)): + session = SessionLocal() + try: + created_data = [ + { + "road_state": item.road_state, + "user_id": user_id, + "x": item.agent_data.accelerometer.x, + "y": item.agent_data.accelerometer.y, + "z": item.agent_data.accelerometer.z, + "latitude": item.agent_data.gps.latitude, + "longitude": item.agent_data.gps.longitude, + "timestamp": item.agent_data.timestamp, + } + for item in data + ] + stmt = processed_agent_data.insert().values(created_data).returning(processed_agent_data) + result = session.execute(stmt) + created_records = [dict(row._mapping) for row in result.fetchall()] + session.commit() + + for record in created_records: + await send_data_to_subscribers(user_id, jsonable_encoder(record)) + return created_records + except Exception as err: + session.rollback() + print(f"Database error: {err}") + raise HTTPException(status_code=500, detail="Internal Server Error") + finally: + session.close() @app.get( @@ -136,14 +98,34 @@ async def create_processed_agent_data(data: List[ProcessedAgentData]): response_model=ProcessedAgentDataInDB, ) def read_processed_agent_data(processed_agent_data_id: int): - # Get data by id - pass + session = SessionLocal() + try: + stmt = select(processed_agent_data).where( + processed_agent_data.c.id == processed_agent_data_id + ) + res = session.execute(stmt).fetchone() + if not res: + raise HTTPException(status_code=404, detail="Not found") + + return dict(res._mapping) + + finally: + session.close() @app.get("/processed_agent_data/", response_model=list[ProcessedAgentDataInDB]) def list_processed_agent_data(): - # Get list of data - pass + session = SessionLocal() + try: + stmt = select(processed_agent_data) + res = session.execute(stmt).fetchall() + if not res: + raise HTTPException(status_code=404, detail="Not found") + + return [dict(r._mapping) for r in res] + + finally: + session.close() @app.put( @@ -152,7 +134,41 @@ def list_processed_agent_data(): ) def update_processed_agent_data(processed_agent_data_id: int, data: ProcessedAgentData): # Update data - pass + session = SessionLocal() + + try: + query = select(processed_agent_data).where( + processed_agent_data.c.id == processed_agent_data_id + ) + result = session.execute(query).fetchone() + + if not result: + raise HTTPException(status_code=404, detail="Data not found") + + update_query = ( + processed_agent_data.update() + .where(processed_agent_data.c.id == processed_agent_data_id) + .values( + road_state=data.road_state, + user_id=data.agent_data.user_id, + x=data.agent_data.accelerometer.x, + y=data.agent_data.accelerometer.y, + z=data.agent_data.accelerometer.z, + latitude=data.agent_data.gps.latitude, + longitude=data.agent_data.gps.longitude, + timestamp=data.agent_data.timestamp, + ) + ) + + session.execute(update_query) + session.commit() + + updated_result = session.execute(query).fetchone() + + return ProcessedAgentDataInDB(**updated_result._mapping) + + finally: + session.close() @app.delete( @@ -161,8 +177,28 @@ def update_processed_agent_data(processed_agent_data_id: int, data: ProcessedAge ) def delete_processed_agent_data(processed_agent_data_id: int): # Delete by id - pass + session = SessionLocal() + try: + query = select(processed_agent_data).where( + processed_agent_data.c.id == processed_agent_data_id + ) + result = session.execute(query).fetchone() + + if not result: + raise HTTPException(status_code=404, detail="Data not found") + + delete_query = processed_agent_data.delete().where( + processed_agent_data.c.id == processed_agent_data_id + ) + + session.execute(delete_query) + session.commit() + + return ProcessedAgentDataInDB(**result._mapping) + + finally: + session.close() if __name__ == "__main__": import uvicorn diff --git a/store/requirements.txt b/store/requirements.txt index f920843..b70b297 100644 Binary files a/store/requirements.txt and b/store/requirements.txt differ diff --git a/store/schemas.py b/store/schemas.py new file mode 100644 index 0000000..3d13dee --- /dev/null +++ b/store/schemas.py @@ -0,0 +1,51 @@ +from datetime import datetime + +from pydantic import BaseModel, field_validator + + +class ProcessedAgentDataInDB(BaseModel): + id: int + road_state: str + user_id: int + x: float + y: float + z: float + latitude: float + longitude: float + timestamp: datetime + + +# FastAPI models +class AccelerometerData(BaseModel): + x: float + y: float + z: float + + +class GpsData(BaseModel): + latitude: float + longitude: float + + +class AgentData(BaseModel): + user_id: int + accelerometer: AccelerometerData + gps: GpsData + timestamp: datetime + + @classmethod + @field_validator("timestamp", mode="before") + def check_timestamp(cls, value): + if isinstance(value, datetime): + return value + try: + return datetime.fromisoformat(value) + except (TypeError, ValueError): + raise ValueError( + "Invalid timestamp format. Expected ISO 8601 format (YYYY-MM-DDTHH:MM:SSZ)." + ) + + +class ProcessedAgentData(BaseModel): + road_state: str + agent_data: AgentData