Compare commits
23 Commits
312177e087
...
d215e527ed
| Author | SHA1 | Date | |
|---|---|---|---|
| d215e527ed | |||
| 340706c7e5 | |||
| ff502c3be8 | |||
| ad976c8074 | |||
| 91cca10a04 | |||
| dbd6e88de8 | |||
| eee4743d4d | |||
| b3162e11bd | |||
| 12f6b9871f | |||
| ceb9037dc4 | |||
| 2935a9349d | |||
| b772848d4c | |||
| 722d45b2a1 | |||
| ad07896ba3 | |||
| 4ff31c0884 | |||
| de8d2516f6 | |||
| c291f150af | |||
| 2cdd8d1bd0 | |||
| 9276239d41 | |||
| a9673c1070 | |||
| 9a4aeec6cb | |||
| d1be717490 | |||
| b330180909 |
@ -1,23 +0,0 @@
|
|||||||
# IDEs
|
|
||||||
.idea/
|
|
||||||
.vscode/
|
|
||||||
.git/
|
|
||||||
.gitignore
|
|
||||||
.dockerignore
|
|
||||||
.DS_Store
|
|
||||||
Thumbs.db
|
|
||||||
|
|
||||||
# Python
|
|
||||||
**/__pycache__/
|
|
||||||
**/*.py[cod]
|
|
||||||
**/*.pyo
|
|
||||||
**/*.pyd
|
|
||||||
venv/
|
|
||||||
.env
|
|
||||||
|
|
||||||
# Logs & Database & Broker data
|
|
||||||
*.log
|
|
||||||
**/mosquitto/data/
|
|
||||||
**/mosquitto/log/
|
|
||||||
**/postgres_data/
|
|
||||||
**/pgadmin-data/
|
|
||||||
29
.gitea/workflows/testing.yaml
Normal file
29
.gitea/workflows/testing.yaml
Normal file
@ -0,0 +1,29 @@
|
|||||||
|
name: Test Agent
|
||||||
|
on: [push, workflow_dispatch]
|
||||||
|
|
||||||
|
concurrency:
|
||||||
|
cancel-in-progress: false
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
test-agent-run:
|
||||||
|
runs-on: arch-x86_64
|
||||||
|
steps:
|
||||||
|
- name: Fetch the repository
|
||||||
|
run: git clone --revision ${{ gitea.sha }} --depth 1 ${{ gitea.server_url }}/${{ gitea.repository }}
|
||||||
|
|
||||||
|
- name: Build containers
|
||||||
|
run: docker-compose -f docker-compose-test.yaml build
|
||||||
|
working-directory: sem8-iot-test/agent/docker
|
||||||
|
|
||||||
|
- name: Start MQTT broker
|
||||||
|
run: docker-compose -f docker-compose-test.yaml up -d mqtt
|
||||||
|
working-directory: sem8-iot-test/agent/docker
|
||||||
|
|
||||||
|
- name: Start agent
|
||||||
|
run: docker-compose -f docker-compose-test.yaml run fake_agent
|
||||||
|
working-directory: sem8-iot-test/agent/docker
|
||||||
|
|
||||||
|
- name: Clean up
|
||||||
|
if: always()
|
||||||
|
run: docker-compose -f docker-compose-test.yaml down
|
||||||
|
working-directory: sem8-iot-test/agent/docker
|
||||||
27
.gitignore
vendored
27
.gitignore
vendored
@ -1,25 +1,2 @@
|
|||||||
# IDEs
|
agent/docker/mosquitto/data/
|
||||||
.idea/
|
agent/docker/mosquitto/log/
|
||||||
.vscode/
|
|
||||||
*.swp
|
|
||||||
*.swo
|
|
||||||
|
|
||||||
# Python
|
|
||||||
venv/
|
|
||||||
__pycache__/
|
|
||||||
*.py[cod]
|
|
||||||
*$py.class
|
|
||||||
.env
|
|
||||||
|
|
||||||
# Logs
|
|
||||||
*.log
|
|
||||||
app.log
|
|
||||||
|
|
||||||
# Database & Broker data
|
|
||||||
**/mosquitto/data/
|
|
||||||
**/mosquitto/log/
|
|
||||||
**/postgres_data/
|
|
||||||
**/pgadmin-data/
|
|
||||||
|
|
||||||
# OS specific
|
|
||||||
.DS_Store
|
|
||||||
3
MapView/.gitignore
vendored
Normal file
3
MapView/.gitignore
vendored
Normal file
@ -0,0 +1,3 @@
|
|||||||
|
.idea
|
||||||
|
venv
|
||||||
|
__pycache__
|
||||||
2
agent/.gitignore
vendored
Normal file
2
agent/.gitignore
vendored
Normal file
@ -0,0 +1,2 @@
|
|||||||
|
venv
|
||||||
|
__pycache__
|
||||||
@ -3,10 +3,14 @@ FROM python:latest
|
|||||||
# set the working directory in the container
|
# set the working directory in the container
|
||||||
WORKDIR /usr/agent
|
WORKDIR /usr/agent
|
||||||
# copy the dependencies file to the working directory
|
# copy the dependencies file to the working directory
|
||||||
COPY agent/requirements.txt .
|
COPY requirements.txt .
|
||||||
# install dependencies
|
# install dependencies
|
||||||
RUN pip install --no-cache-dir -r requirements.txt
|
RUN pip install -r requirements.txt
|
||||||
# copy the content of the local src directory to the working directory
|
# copy the content of the local src directory to the working directory
|
||||||
COPY agent/src/ .
|
COPY src/ .
|
||||||
# command to run on container start
|
# command to run on container start
|
||||||
CMD ["python", "main.py"]
|
CMD ["python", "main.py"]
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
34
agent/docker/docker-compose-test.yaml
Normal file
34
agent/docker/docker-compose-test.yaml
Normal file
@ -0,0 +1,34 @@
|
|||||||
|
version: "3.3"
|
||||||
|
#name: "road_vision"
|
||||||
|
services:
|
||||||
|
mqtt:
|
||||||
|
image: eclipse-mosquitto
|
||||||
|
container_name: mqtt
|
||||||
|
volumes:
|
||||||
|
- ./mosquitto:/mosquitto
|
||||||
|
- ./mosquitto/data:/mosquitto/data
|
||||||
|
- ./mosquitto/log:/mosquitto/log
|
||||||
|
ports:
|
||||||
|
- 1883:1883
|
||||||
|
- 9001:9001
|
||||||
|
networks:
|
||||||
|
mqtt_network:
|
||||||
|
|
||||||
|
|
||||||
|
fake_agent:
|
||||||
|
container_name: agent
|
||||||
|
build: ../
|
||||||
|
depends_on:
|
||||||
|
- mqtt
|
||||||
|
environment:
|
||||||
|
MQTT_BROKER_HOST: "mqtt"
|
||||||
|
MQTT_BROKER_PORT: 1883
|
||||||
|
MQTT_TOPIC: "agent_data_topic"
|
||||||
|
DELAY: 0.1
|
||||||
|
MAX_SENDS: 30
|
||||||
|
networks:
|
||||||
|
mqtt_network:
|
||||||
|
|
||||||
|
|
||||||
|
networks:
|
||||||
|
mqtt_network:
|
||||||
@ -1,4 +1,5 @@
|
|||||||
name: "road_vision"
|
version: "3.3"
|
||||||
|
#name: "road_vision"
|
||||||
services:
|
services:
|
||||||
mqtt:
|
mqtt:
|
||||||
image: eclipse-mosquitto
|
image: eclipse-mosquitto
|
||||||
@ -16,9 +17,7 @@ services:
|
|||||||
|
|
||||||
fake_agent:
|
fake_agent:
|
||||||
container_name: agent
|
container_name: agent
|
||||||
build:
|
build: ../
|
||||||
context: ../../
|
|
||||||
dockerfile: agent/Dockerfile
|
|
||||||
depends_on:
|
depends_on:
|
||||||
- mqtt
|
- mqtt
|
||||||
environment:
|
environment:
|
||||||
|
|||||||
@ -16,3 +16,6 @@ MQTT_TOPIC = os.environ.get("MQTT_TOPIC") or "agent"
|
|||||||
|
|
||||||
# Delay for sending data to mqtt in seconds
|
# Delay for sending data to mqtt in seconds
|
||||||
DELAY = try_parse(float, os.environ.get("DELAY")) or 1
|
DELAY = try_parse(float, os.environ.get("DELAY")) or 1
|
||||||
|
|
||||||
|
# Testing switches for CI/CD
|
||||||
|
MAX_SENDS = try_parse(int, os.environ.get("MAX_SENDS"))
|
||||||
|
|||||||
@ -1,22 +0,0 @@
|
|||||||
longitude,latitude,empty_count
|
|
||||||
50.450386085935094,30.524547100067142,10
|
|
||||||
50.450386085935094,30.524547100067142,11
|
|
||||||
50.450386085935094,30.524547100067142,13
|
|
||||||
50.450386085935094,30.524547100067142,15
|
|
||||||
50.450386085935094,30.524547100067142,7
|
|
||||||
50.450386085935094,30.524547100067142,9
|
|
||||||
50.450386085935094,30.524547100067142,4
|
|
||||||
50.450386085935094,30.524547100067142,0
|
|
||||||
50.450386085935094,30.524547100067142,0
|
|
||||||
50.450386085935094,30.524547100067142,3
|
|
||||||
50.450386085935094,30.524547100067142,4
|
|
||||||
50.450069433207545,30.52406822530458,16
|
|
||||||
50.450069433207545,30.52406822530458,20
|
|
||||||
50.450069433207545,30.52406822530458,25
|
|
||||||
50.450069433207545,30.52406822530458,30
|
|
||||||
50.450069433207545,30.52406822530458,29
|
|
||||||
50.450069433207545,30.52406822530458,12
|
|
||||||
50.450069433207545,30.52406822530458,10
|
|
||||||
50.450069433207545,30.52406822530458,14
|
|
||||||
50.450069433207545,30.52406822530458,3
|
|
||||||
50.450069433207545,30.52406822530458,2
|
|
||||||
|
@ -1,16 +1,13 @@
|
|||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
|
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
|
||||||
from domain.accelerometer import Accelerometer
|
from domain.accelerometer import Accelerometer
|
||||||
from domain.gps import Gps
|
from domain.gps import Gps
|
||||||
from domain.parking import Parking
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class AggregatedData:
|
class AggregatedData:
|
||||||
accelerometer: Accelerometer
|
accelerometer: Accelerometer
|
||||||
gps: Gps
|
gps: Gps
|
||||||
parking: Parking
|
|
||||||
timestamp: datetime
|
timestamp: datetime
|
||||||
user_id: int
|
user_id: int
|
||||||
|
|||||||
@ -1,9 +0,0 @@
|
|||||||
from dataclasses import dataclass
|
|
||||||
|
|
||||||
from domain.gps import Gps
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class Parking:
|
|
||||||
empty_count: int
|
|
||||||
gps: Gps
|
|
||||||
@ -4,7 +4,6 @@ from datetime import datetime
|
|||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Optional, List
|
from typing import Optional, List
|
||||||
|
|
||||||
from domain.parking import Parking
|
|
||||||
from domain.accelerometer import Accelerometer
|
from domain.accelerometer import Accelerometer
|
||||||
from domain.gps import Gps
|
from domain.gps import Gps
|
||||||
from domain.aggregated_data import AggregatedData
|
from domain.aggregated_data import AggregatedData
|
||||||
@ -13,22 +12,12 @@ import config
|
|||||||
|
|
||||||
class FileDatasource:
|
class FileDatasource:
|
||||||
|
|
||||||
def __init__(
|
def __init__(self, accelerometer_filename: str, gps_filename: str) -> None:
|
||||||
self,
|
|
||||||
accelerometer_filename: str,
|
|
||||||
gps_filename: str,
|
|
||||||
park_filename: str,
|
|
||||||
) -> None:
|
|
||||||
|
|
||||||
self.accelerometer_filename = accelerometer_filename
|
self.accelerometer_filename = accelerometer_filename
|
||||||
self.park_filename = park_filename
|
|
||||||
self.gps_filename = gps_filename
|
self.gps_filename = gps_filename
|
||||||
|
|
||||||
self._park_f = None
|
|
||||||
self._acc_f = None
|
self._acc_f = None
|
||||||
self._gps_f = None
|
self._gps_f = None
|
||||||
|
|
||||||
self._park_reader: Optional[csv.reader] = None
|
|
||||||
self._acc_reader: Optional[csv.reader] = None
|
self._acc_reader: Optional[csv.reader] = None
|
||||||
self._gps_reader: Optional[csv.reader] = None
|
self._gps_reader: Optional[csv.reader] = None
|
||||||
|
|
||||||
@ -41,8 +30,6 @@ class FileDatasource:
|
|||||||
|
|
||||||
if not Path(self.accelerometer_filename).exists():
|
if not Path(self.accelerometer_filename).exists():
|
||||||
raise FileNotFoundError(f"Accelerometer file not found: {self.accelerometer_filename}")
|
raise FileNotFoundError(f"Accelerometer file not found: {self.accelerometer_filename}")
|
||||||
if not Path(self.park_filename).exists():
|
|
||||||
raise FileNotFoundError(f"Accelerometer file not found: {self.park_filename}")
|
|
||||||
if not Path(self.gps_filename).exists():
|
if not Path(self.gps_filename).exists():
|
||||||
raise FileNotFoundError(f"GPS file not found: {self.gps_filename}")
|
raise FileNotFoundError(f"GPS file not found: {self.gps_filename}")
|
||||||
|
|
||||||
@ -60,11 +47,9 @@ class FileDatasource:
|
|||||||
raise RuntimeError("Datasource is not started. Call startReading() before read().")
|
raise RuntimeError("Datasource is not started. Call startReading() before read().")
|
||||||
|
|
||||||
acc_row = self._get_next_row(self._acc_reader, source="acc")
|
acc_row = self._get_next_row(self._acc_reader, source="acc")
|
||||||
park_row = self._get_next_row(self._park_reader, source="park")
|
|
||||||
gps_row = self._get_next_row(self._gps_reader, source="gps")
|
gps_row = self._get_next_row(self._gps_reader, source="gps")
|
||||||
|
|
||||||
acc = self._parse_acc(acc_row)
|
acc = self._parse_acc(acc_row)
|
||||||
park = self._parse_park(park_row)
|
|
||||||
gps = self._parse_gps(gps_row)
|
gps = self._parse_gps(gps_row)
|
||||||
|
|
||||||
# IMPORTANT: timing belongs to datasource (not MQTT / main.py)
|
# IMPORTANT: timing belongs to datasource (not MQTT / main.py)
|
||||||
@ -74,7 +59,6 @@ class FileDatasource:
|
|||||||
return AggregatedData(
|
return AggregatedData(
|
||||||
accelerometer=acc,
|
accelerometer=acc,
|
||||||
gps=gps,
|
gps=gps,
|
||||||
parking=park,
|
|
||||||
timestamp=datetime.utcnow(),
|
timestamp=datetime.utcnow(),
|
||||||
user_id=config.USER_ID,
|
user_id=config.USER_ID,
|
||||||
)
|
)
|
||||||
@ -85,17 +69,14 @@ class FileDatasource:
|
|||||||
self._close_files()
|
self._close_files()
|
||||||
|
|
||||||
self._acc_f = open(self.accelerometer_filename, "r", newline="", encoding="utf-8")
|
self._acc_f = open(self.accelerometer_filename, "r", newline="", encoding="utf-8")
|
||||||
self._park_f = open(self.park_filename, "r", newline="", encoding="utf-8")
|
|
||||||
self._gps_f = open(self.gps_filename, "r", newline="", encoding="utf-8")
|
self._gps_f = open(self.gps_filename, "r", newline="", encoding="utf-8")
|
||||||
|
|
||||||
self._acc_reader = csv.reader(self._acc_f, skipinitialspace=True)
|
self._acc_reader = csv.reader(self._acc_f, skipinitialspace=True)
|
||||||
self._park_reader = csv.reader(self._park_f, skipinitialspace=True)
|
|
||||||
self._gps_reader = csv.reader(self._gps_f, skipinitialspace=True)
|
self._gps_reader = csv.reader(self._gps_f, skipinitialspace=True)
|
||||||
|
|
||||||
# File pointer is already at 0 right after open(), so no need to rewind here.
|
# File pointer is already at 0 right after open(), so no need to rewind here.
|
||||||
# Skip header row once.
|
# Skip header row once.
|
||||||
next(self._acc_reader, None)
|
next(self._acc_reader, None)
|
||||||
next(self._park_reader, None)
|
|
||||||
next(self._gps_reader, None)
|
next(self._gps_reader, None)
|
||||||
|
|
||||||
def _close_files(self) -> None:
|
def _close_files(self) -> None:
|
||||||
@ -107,10 +88,8 @@ class FileDatasource:
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
self._acc_f = None
|
self._acc_f = None
|
||||||
self._park_f = None
|
|
||||||
self._gps_f = None
|
self._gps_f = None
|
||||||
self._acc_reader = None
|
self._acc_reader = None
|
||||||
self._park_reader = None
|
|
||||||
self._gps_reader = None
|
self._gps_reader = None
|
||||||
|
|
||||||
def _rewind_acc(self) -> None:
|
def _rewind_acc(self) -> None:
|
||||||
@ -127,13 +106,6 @@ class FileDatasource:
|
|||||||
self._gps_reader = csv.reader(self._gps_f, skipinitialspace=True)
|
self._gps_reader = csv.reader(self._gps_f, skipinitialspace=True)
|
||||||
next(self._gps_reader, None) # skip header row
|
next(self._gps_reader, None) # skip header row
|
||||||
|
|
||||||
def _rewind_park(self) -> None:
|
|
||||||
if self._park_f is None:
|
|
||||||
raise RuntimeError("GPS file is not open.")
|
|
||||||
self._park_f.seek(0)
|
|
||||||
self._park_reader = csv.reader(self._park_f, skipinitialspace=True)
|
|
||||||
next(self._park_reader, None) # skip header row
|
|
||||||
|
|
||||||
def _get_next_row(self, reader, source: str) -> List[str]:
|
def _get_next_row(self, reader, source: str) -> List[str]:
|
||||||
"""Get the next valid row from the reader."""
|
"""Get the next valid row from the reader."""
|
||||||
if reader is None:
|
if reader is None:
|
||||||
@ -146,10 +118,6 @@ class FileDatasource:
|
|||||||
if source == "acc":
|
if source == "acc":
|
||||||
self._rewind_acc()
|
self._rewind_acc()
|
||||||
reader = self._acc_reader
|
reader = self._acc_reader
|
||||||
|
|
||||||
elif source == 'park':
|
|
||||||
self._rewind_park()
|
|
||||||
reader = self._park_reader
|
|
||||||
else:
|
else:
|
||||||
self._rewind_gps()
|
self._rewind_gps()
|
||||||
reader = self._gps_reader
|
reader = self._gps_reader
|
||||||
@ -181,16 +149,3 @@ class FileDatasource:
|
|||||||
lon = float(row[0])
|
lon = float(row[0])
|
||||||
lat = float(row[1])
|
lat = float(row[1])
|
||||||
return Gps(longitude=lon, latitude=lat)
|
return Gps(longitude=lon, latitude=lat)
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _parse_park(row: List[str]) -> Parking:
|
|
||||||
if len(row) < 2:
|
|
||||||
raise ValueError(f"GPS row must have 2 values (longitude,latitude). Got: {row}")
|
|
||||||
lon = float(row[0])
|
|
||||||
lat = float(row[1])
|
|
||||||
empty_count = int(row[2])
|
|
||||||
|
|
||||||
return Parking(
|
|
||||||
gps=Gps(longitude=lon, latitude=lat),
|
|
||||||
empty_count=empty_count
|
|
||||||
)
|
|
||||||
|
|||||||
@ -22,9 +22,13 @@ def connect_mqtt(broker, port):
|
|||||||
return client
|
return client
|
||||||
|
|
||||||
|
|
||||||
def publish(client, topic, datasource):
|
def publish(client, topic, datasource, max_sends = None):
|
||||||
datasource.startReading()
|
datasource.startReading()
|
||||||
|
|
||||||
|
i = 0
|
||||||
while True:
|
while True:
|
||||||
|
i += 1
|
||||||
|
|
||||||
data = datasource.read()
|
data = datasource.read()
|
||||||
msg = AggregatedDataSchema().dumps(data)
|
msg = AggregatedDataSchema().dumps(data)
|
||||||
result = client.publish(topic, msg)
|
result = client.publish(topic, msg)
|
||||||
@ -32,14 +36,18 @@ def publish(client, topic, datasource):
|
|||||||
if status != 0:
|
if status != 0:
|
||||||
print(f"Failed to send message to topic {topic}")
|
print(f"Failed to send message to topic {topic}")
|
||||||
|
|
||||||
|
if max_sends and i >= max_sends:
|
||||||
|
# display test success
|
||||||
|
exit(0)
|
||||||
|
|
||||||
|
|
||||||
def run():
|
def run():
|
||||||
# Prepare mqtt client
|
# Prepare mqtt client
|
||||||
client = connect_mqtt(config.MQTT_BROKER_HOST, config.MQTT_BROKER_PORT)
|
client = connect_mqtt(config.MQTT_BROKER_HOST, config.MQTT_BROKER_PORT)
|
||||||
# Prepare datasource
|
# Prepare datasource
|
||||||
datasource = FileDatasource("data/accelerometer.csv", "data/gps.csv", "data/parking.csv")
|
datasource = FileDatasource("data/accelerometer.csv", "data/gps.csv")
|
||||||
# Infinity publish data
|
# Infinity publish data
|
||||||
publish(client, config.MQTT_TOPIC, datasource)
|
publish(client, config.MQTT_TOPIC, datasource, getattr(config, "MAX_SENDS", None))
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|||||||
@ -1,12 +1,10 @@
|
|||||||
from marshmallow import Schema, fields
|
from marshmallow import Schema, fields
|
||||||
from schema.accelerometer_schema import AccelerometerSchema
|
from schema.accelerometer_schema import AccelerometerSchema
|
||||||
from schema.gps_schema import GpsSchema
|
from schema.gps_schema import GpsSchema
|
||||||
from schema.parking_schema import ParkingSchema
|
|
||||||
|
|
||||||
|
|
||||||
class AggregatedDataSchema(Schema):
|
class AggregatedDataSchema(Schema):
|
||||||
accelerometer = fields.Nested(AccelerometerSchema)
|
accelerometer = fields.Nested(AccelerometerSchema)
|
||||||
gps = fields.Nested(GpsSchema)
|
gps = fields.Nested(GpsSchema)
|
||||||
parking = fields.Nested(ParkingSchema)
|
|
||||||
timestamp = fields.DateTime("iso")
|
timestamp = fields.DateTime("iso")
|
||||||
user_id = fields.Int()
|
user_id = fields.Int()
|
||||||
|
|||||||
@ -1,8 +0,0 @@
|
|||||||
from marshmallow import Schema, fields
|
|
||||||
|
|
||||||
from schema.gps_schema import GpsSchema
|
|
||||||
|
|
||||||
|
|
||||||
class ParkingSchema(Schema):
|
|
||||||
gps = fields.Nested(GpsSchema)
|
|
||||||
empty_count = fields.Int()
|
|
||||||
2
edge/.gitignore
vendored
Normal file
2
edge/.gitignore
vendored
Normal file
@ -0,0 +1,2 @@
|
|||||||
|
venv
|
||||||
|
app.log
|
||||||
@ -3,9 +3,9 @@ FROM python:3.9-slim
|
|||||||
# Set the working directory inside the container
|
# Set the working directory inside the container
|
||||||
WORKDIR /app
|
WORKDIR /app
|
||||||
# Copy the requirements.txt file and install dependencies
|
# Copy the requirements.txt file and install dependencies
|
||||||
COPY edge/requirements.txt .
|
COPY requirements.txt .
|
||||||
RUN pip install --no-cache-dir -r requirements.txt
|
RUN pip install --no-cache-dir -r requirements.txt
|
||||||
# Copy the entire application into the container
|
# Copy the entire application into the container
|
||||||
COPY edge/. .
|
COPY . .
|
||||||
# Run the main.py script inside the container when it starts
|
# Run the main.py script inside the container when it starts
|
||||||
CMD ["python", "main.py"]
|
CMD ["python", "main.py"]
|
||||||
@ -13,13 +13,12 @@ services:
|
|||||||
- 19001:9001
|
- 19001:9001
|
||||||
networks:
|
networks:
|
||||||
mqtt_network:
|
mqtt_network:
|
||||||
|
user: 1000:1000
|
||||||
|
|
||||||
|
|
||||||
edge:
|
edge:
|
||||||
container_name: edge
|
container_name: edge
|
||||||
build:
|
build: ../
|
||||||
context: ../../
|
|
||||||
dockerfile: edge/Dockerfile
|
|
||||||
depends_on:
|
depends_on:
|
||||||
- mqtt
|
- mqtt
|
||||||
environment:
|
environment:
|
||||||
|
|||||||
2
hub/.gitignore
vendored
Normal file
2
hub/.gitignore
vendored
Normal file
@ -0,0 +1,2 @@
|
|||||||
|
venv
|
||||||
|
__pycache__
|
||||||
@ -3,10 +3,9 @@ FROM python:3.9-slim
|
|||||||
# Set the working directory inside the container
|
# Set the working directory inside the container
|
||||||
WORKDIR /app
|
WORKDIR /app
|
||||||
# Copy the requirements.txt file and install dependencies
|
# Copy the requirements.txt file and install dependencies
|
||||||
COPY hub/requirements.txt .
|
COPY requirements.txt .
|
||||||
|
|
||||||
RUN pip install --no-cache-dir -r requirements.txt
|
RUN pip install --no-cache-dir -r requirements.txt
|
||||||
# Copy the entire application into the container
|
# Copy the entire application into the container
|
||||||
COPY hub/. .
|
COPY . .
|
||||||
# Run the main.py script inside the container when it starts
|
# Run the main.py script inside the container when it starts
|
||||||
CMD ["uvicorn", "main:app", "--host", "0.0.0.0"]
|
CMD ["uvicorn", "main:app", "--host", "0.0.0.0"]
|
||||||
@ -1,3 +1,4 @@
|
|||||||
|
version: "3.9"
|
||||||
name: "road_vision__hub"
|
name: "road_vision__hub"
|
||||||
services:
|
services:
|
||||||
mqtt:
|
mqtt:
|
||||||
@ -15,7 +16,7 @@ services:
|
|||||||
|
|
||||||
|
|
||||||
postgres_db:
|
postgres_db:
|
||||||
image: postgres:17
|
image: postgres:latest
|
||||||
container_name: postgres_db
|
container_name: postgres_db
|
||||||
restart: always
|
restart: always
|
||||||
environment:
|
environment:
|
||||||
@ -48,9 +49,7 @@ services:
|
|||||||
|
|
||||||
store:
|
store:
|
||||||
container_name: store
|
container_name: store
|
||||||
build:
|
build: ../../store
|
||||||
context: ../../
|
|
||||||
dockerfile: store/Dockerfile
|
|
||||||
depends_on:
|
depends_on:
|
||||||
- postgres_db
|
- postgres_db
|
||||||
restart: always
|
restart: always
|
||||||
@ -78,9 +77,7 @@ services:
|
|||||||
|
|
||||||
hub:
|
hub:
|
||||||
container_name: hub
|
container_name: hub
|
||||||
build:
|
build: ../
|
||||||
context: ../../
|
|
||||||
dockerfile: hub/Dockerfile
|
|
||||||
depends_on:
|
depends_on:
|
||||||
- mqtt
|
- mqtt
|
||||||
- redis
|
- redis
|
||||||
|
|||||||
3
store/.gitignore
vendored
Normal file
3
store/.gitignore
vendored
Normal file
@ -0,0 +1,3 @@
|
|||||||
|
venv
|
||||||
|
__pycache__
|
||||||
|
.idea
|
||||||
@ -3,10 +3,9 @@ FROM python:latest
|
|||||||
# Set the working directory inside the container
|
# Set the working directory inside the container
|
||||||
WORKDIR /app
|
WORKDIR /app
|
||||||
# Copy the requirements.txt file and install dependencies
|
# Copy the requirements.txt file and install dependencies
|
||||||
COPY store/requirements.txt .
|
COPY requirements.txt .
|
||||||
|
|
||||||
RUN pip install --no-cache-dir -r requirements.txt
|
RUN pip install --no-cache-dir -r requirements.txt
|
||||||
# Copy the entire application into the container
|
# Copy the entire application into the container
|
||||||
COPY store/. .
|
COPY . .
|
||||||
# Run the main.py script inside the container when it starts
|
# Run the main.py script inside the container when it starts
|
||||||
CMD ["uvicorn", "main:app", "--host", "0.0.0.0"]
|
CMD ["uvicorn", "main:app", "--host", "0.0.0.0"]
|
||||||
@ -1,15 +0,0 @@
|
|||||||
from sqlalchemy import MetaData
|
|
||||||
from sqlalchemy import create_engine
|
|
||||||
from sqlalchemy.orm import sessionmaker, declarative_base
|
|
||||||
|
|
||||||
from config import POSTGRES_USER, POSTGRES_PASSWORD, POSTGRES_HOST, POSTGRES_PORT, POSTGRES_DB
|
|
||||||
|
|
||||||
|
|
||||||
DATABASE_URL = f"postgresql+psycopg2://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{POSTGRES_HOST}:{POSTGRES_PORT}/{POSTGRES_DB}"
|
|
||||||
engine = create_engine(DATABASE_URL)
|
|
||||||
|
|
||||||
Base = declarative_base()
|
|
||||||
|
|
||||||
metadata = MetaData()
|
|
||||||
|
|
||||||
SessionLocal = sessionmaker(bind=engine)
|
|
||||||
@ -1,7 +1,8 @@
|
|||||||
|
version: "3.9"
|
||||||
name: "road_vision__database"
|
name: "road_vision__database"
|
||||||
services:
|
services:
|
||||||
postgres_db:
|
postgres_db:
|
||||||
image: postgres:17
|
image: postgres:latest
|
||||||
container_name: postgres_db
|
container_name: postgres_db
|
||||||
restart: always
|
restart: always
|
||||||
environment:
|
environment:
|
||||||
@ -34,9 +35,7 @@ services:
|
|||||||
|
|
||||||
store:
|
store:
|
||||||
container_name: store
|
container_name: store
|
||||||
build:
|
build: ..
|
||||||
context: ../../
|
|
||||||
dockerfile: store/Dockerfile
|
|
||||||
depends_on:
|
depends_on:
|
||||||
- postgres_db
|
- postgres_db
|
||||||
restart: always
|
restart: always
|
||||||
|
|||||||
196
store/main.py
196
store/main.py
@ -1,8 +1,10 @@
|
|||||||
|
import asyncio
|
||||||
import json
|
import json
|
||||||
from typing import Set, Dict, List
|
from typing import Set, Dict, List, Any
|
||||||
from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect, Body
|
from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect, Body
|
||||||
from fastapi.encoders import jsonable_encoder
|
|
||||||
from sqlalchemy import (
|
from sqlalchemy import (
|
||||||
|
create_engine,
|
||||||
|
MetaData,
|
||||||
Table,
|
Table,
|
||||||
Column,
|
Column,
|
||||||
Integer,
|
Integer,
|
||||||
@ -10,14 +12,25 @@ from sqlalchemy import (
|
|||||||
Float,
|
Float,
|
||||||
DateTime,
|
DateTime,
|
||||||
)
|
)
|
||||||
|
from sqlalchemy.orm import sessionmaker
|
||||||
from sqlalchemy.sql import select
|
from sqlalchemy.sql import select
|
||||||
|
from datetime import datetime
|
||||||
from database import metadata, SessionLocal
|
from pydantic import BaseModel, field_validator
|
||||||
from schemas import ProcessedAgentData, ProcessedAgentDataInDB
|
from config import (
|
||||||
|
POSTGRES_HOST,
|
||||||
|
POSTGRES_PORT,
|
||||||
|
POSTGRES_DB,
|
||||||
|
POSTGRES_USER,
|
||||||
|
POSTGRES_PASSWORD,
|
||||||
|
)
|
||||||
|
|
||||||
# FastAPI app setup
|
# FastAPI app setup
|
||||||
app = FastAPI()
|
app = FastAPI()
|
||||||
|
# SQLAlchemy setup
|
||||||
|
DATABASE_URL = f"postgresql+psycopg2://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{POSTGRES_HOST}:{POSTGRES_PORT}/{POSTGRES_DB}"
|
||||||
|
engine = create_engine(DATABASE_URL)
|
||||||
|
metadata = MetaData()
|
||||||
|
# Define the ProcessedAgentData table
|
||||||
processed_agent_data = Table(
|
processed_agent_data = Table(
|
||||||
"processed_agent_data",
|
"processed_agent_data",
|
||||||
metadata,
|
metadata,
|
||||||
@ -31,6 +44,57 @@ processed_agent_data = Table(
|
|||||||
Column("longitude", Float),
|
Column("longitude", Float),
|
||||||
Column("timestamp", DateTime),
|
Column("timestamp", DateTime),
|
||||||
)
|
)
|
||||||
|
SessionLocal = sessionmaker(bind=engine)
|
||||||
|
|
||||||
|
|
||||||
|
# SQLAlchemy model
|
||||||
|
class ProcessedAgentDataInDB(BaseModel):
|
||||||
|
id: int
|
||||||
|
road_state: str
|
||||||
|
user_id: int
|
||||||
|
x: float
|
||||||
|
y: float
|
||||||
|
z: float
|
||||||
|
latitude: float
|
||||||
|
longitude: float
|
||||||
|
timestamp: datetime
|
||||||
|
|
||||||
|
|
||||||
|
# FastAPI models
|
||||||
|
class AccelerometerData(BaseModel):
|
||||||
|
x: float
|
||||||
|
y: float
|
||||||
|
z: float
|
||||||
|
|
||||||
|
|
||||||
|
class GpsData(BaseModel):
|
||||||
|
latitude: float
|
||||||
|
longitude: float
|
||||||
|
|
||||||
|
|
||||||
|
class AgentData(BaseModel):
|
||||||
|
user_id: int
|
||||||
|
accelerometer: AccelerometerData
|
||||||
|
gps: GpsData
|
||||||
|
timestamp: datetime
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
@field_validator("timestamp", mode="before")
|
||||||
|
def check_timestamp(cls, value):
|
||||||
|
if isinstance(value, datetime):
|
||||||
|
return value
|
||||||
|
try:
|
||||||
|
return datetime.fromisoformat(value)
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
raise ValueError(
|
||||||
|
"Invalid timestamp format. Expected ISO 8601 format (YYYY-MM-DDTHH:MM:SSZ)."
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class ProcessedAgentData(BaseModel):
|
||||||
|
road_state: str
|
||||||
|
agent_data: AgentData
|
||||||
|
|
||||||
|
|
||||||
# WebSocket subscriptions
|
# WebSocket subscriptions
|
||||||
subscriptions: Dict[int, Set[WebSocket]] = {}
|
subscriptions: Dict[int, Set[WebSocket]] = {}
|
||||||
@ -61,36 +125,10 @@ async def send_data_to_subscribers(user_id: int, data):
|
|||||||
|
|
||||||
|
|
||||||
@app.post("/processed_agent_data/")
|
@app.post("/processed_agent_data/")
|
||||||
async def create_processed_agent_data(data: List[ProcessedAgentData], user_id: int = Body(..., embed=True)):
|
async def create_processed_agent_data(data: List[ProcessedAgentData]):
|
||||||
session = SessionLocal()
|
# Insert data to database
|
||||||
try:
|
# Send data to subscribers
|
||||||
created_data = [
|
pass
|
||||||
{
|
|
||||||
"road_state": item.road_state,
|
|
||||||
"user_id": user_id,
|
|
||||||
"x": item.agent_data.accelerometer.x,
|
|
||||||
"y": item.agent_data.accelerometer.y,
|
|
||||||
"z": item.agent_data.accelerometer.z,
|
|
||||||
"latitude": item.agent_data.gps.latitude,
|
|
||||||
"longitude": item.agent_data.gps.longitude,
|
|
||||||
"timestamp": item.agent_data.timestamp,
|
|
||||||
}
|
|
||||||
for item in data
|
|
||||||
]
|
|
||||||
stmt = processed_agent_data.insert().values(created_data).returning(processed_agent_data)
|
|
||||||
result = session.execute(stmt)
|
|
||||||
created_records = [dict(row._mapping) for row in result.fetchall()]
|
|
||||||
session.commit()
|
|
||||||
|
|
||||||
for record in created_records:
|
|
||||||
await send_data_to_subscribers(user_id, jsonable_encoder(record))
|
|
||||||
return created_records
|
|
||||||
except Exception as err:
|
|
||||||
session.rollback()
|
|
||||||
print(f"Database error: {err}")
|
|
||||||
raise HTTPException(status_code=500, detail="Internal Server Error")
|
|
||||||
finally:
|
|
||||||
session.close()
|
|
||||||
|
|
||||||
|
|
||||||
@app.get(
|
@app.get(
|
||||||
@ -98,34 +136,14 @@ async def create_processed_agent_data(data: List[ProcessedAgentData], user_id: i
|
|||||||
response_model=ProcessedAgentDataInDB,
|
response_model=ProcessedAgentDataInDB,
|
||||||
)
|
)
|
||||||
def read_processed_agent_data(processed_agent_data_id: int):
|
def read_processed_agent_data(processed_agent_data_id: int):
|
||||||
session = SessionLocal()
|
# Get data by id
|
||||||
try:
|
pass
|
||||||
stmt = select(processed_agent_data).where(
|
|
||||||
processed_agent_data.c.id == processed_agent_data_id
|
|
||||||
)
|
|
||||||
res = session.execute(stmt).fetchone()
|
|
||||||
if not res:
|
|
||||||
raise HTTPException(status_code=404, detail="Not found")
|
|
||||||
|
|
||||||
return dict(res._mapping)
|
|
||||||
|
|
||||||
finally:
|
|
||||||
session.close()
|
|
||||||
|
|
||||||
|
|
||||||
@app.get("/processed_agent_data/", response_model=list[ProcessedAgentDataInDB])
|
@app.get("/processed_agent_data/", response_model=list[ProcessedAgentDataInDB])
|
||||||
def list_processed_agent_data():
|
def list_processed_agent_data():
|
||||||
session = SessionLocal()
|
# Get list of data
|
||||||
try:
|
pass
|
||||||
stmt = select(processed_agent_data)
|
|
||||||
res = session.execute(stmt).fetchall()
|
|
||||||
if not res:
|
|
||||||
raise HTTPException(status_code=404, detail="Not found")
|
|
||||||
|
|
||||||
return [dict(r._mapping) for r in res]
|
|
||||||
|
|
||||||
finally:
|
|
||||||
session.close()
|
|
||||||
|
|
||||||
|
|
||||||
@app.put(
|
@app.put(
|
||||||
@ -134,41 +152,7 @@ def list_processed_agent_data():
|
|||||||
)
|
)
|
||||||
def update_processed_agent_data(processed_agent_data_id: int, data: ProcessedAgentData):
|
def update_processed_agent_data(processed_agent_data_id: int, data: ProcessedAgentData):
|
||||||
# Update data
|
# Update data
|
||||||
session = SessionLocal()
|
pass
|
||||||
|
|
||||||
try:
|
|
||||||
query = select(processed_agent_data).where(
|
|
||||||
processed_agent_data.c.id == processed_agent_data_id
|
|
||||||
)
|
|
||||||
result = session.execute(query).fetchone()
|
|
||||||
|
|
||||||
if not result:
|
|
||||||
raise HTTPException(status_code=404, detail="Data not found")
|
|
||||||
|
|
||||||
update_query = (
|
|
||||||
processed_agent_data.update()
|
|
||||||
.where(processed_agent_data.c.id == processed_agent_data_id)
|
|
||||||
.values(
|
|
||||||
road_state=data.road_state,
|
|
||||||
user_id=data.agent_data.user_id,
|
|
||||||
x=data.agent_data.accelerometer.x,
|
|
||||||
y=data.agent_data.accelerometer.y,
|
|
||||||
z=data.agent_data.accelerometer.z,
|
|
||||||
latitude=data.agent_data.gps.latitude,
|
|
||||||
longitude=data.agent_data.gps.longitude,
|
|
||||||
timestamp=data.agent_data.timestamp,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
session.execute(update_query)
|
|
||||||
session.commit()
|
|
||||||
|
|
||||||
updated_result = session.execute(query).fetchone()
|
|
||||||
|
|
||||||
return ProcessedAgentDataInDB(**updated_result._mapping)
|
|
||||||
|
|
||||||
finally:
|
|
||||||
session.close()
|
|
||||||
|
|
||||||
|
|
||||||
@app.delete(
|
@app.delete(
|
||||||
@ -177,28 +161,8 @@ def update_processed_agent_data(processed_agent_data_id: int, data: ProcessedAge
|
|||||||
)
|
)
|
||||||
def delete_processed_agent_data(processed_agent_data_id: int):
|
def delete_processed_agent_data(processed_agent_data_id: int):
|
||||||
# Delete by id
|
# Delete by id
|
||||||
session = SessionLocal()
|
pass
|
||||||
|
|
||||||
try:
|
|
||||||
query = select(processed_agent_data).where(
|
|
||||||
processed_agent_data.c.id == processed_agent_data_id
|
|
||||||
)
|
|
||||||
result = session.execute(query).fetchone()
|
|
||||||
|
|
||||||
if not result:
|
|
||||||
raise HTTPException(status_code=404, detail="Data not found")
|
|
||||||
|
|
||||||
delete_query = processed_agent_data.delete().where(
|
|
||||||
processed_agent_data.c.id == processed_agent_data_id
|
|
||||||
)
|
|
||||||
|
|
||||||
session.execute(delete_query)
|
|
||||||
session.commit()
|
|
||||||
|
|
||||||
return ProcessedAgentDataInDB(**result._mapping)
|
|
||||||
|
|
||||||
finally:
|
|
||||||
session.close()
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
import uvicorn
|
import uvicorn
|
||||||
|
|||||||
Binary file not shown.
@ -1,51 +0,0 @@
|
|||||||
from datetime import datetime
|
|
||||||
|
|
||||||
from pydantic import BaseModel, field_validator
|
|
||||||
|
|
||||||
|
|
||||||
class ProcessedAgentDataInDB(BaseModel):
|
|
||||||
id: int
|
|
||||||
road_state: str
|
|
||||||
user_id: int
|
|
||||||
x: float
|
|
||||||
y: float
|
|
||||||
z: float
|
|
||||||
latitude: float
|
|
||||||
longitude: float
|
|
||||||
timestamp: datetime
|
|
||||||
|
|
||||||
|
|
||||||
# FastAPI models
|
|
||||||
class AccelerometerData(BaseModel):
|
|
||||||
x: float
|
|
||||||
y: float
|
|
||||||
z: float
|
|
||||||
|
|
||||||
|
|
||||||
class GpsData(BaseModel):
|
|
||||||
latitude: float
|
|
||||||
longitude: float
|
|
||||||
|
|
||||||
|
|
||||||
class AgentData(BaseModel):
|
|
||||||
user_id: int
|
|
||||||
accelerometer: AccelerometerData
|
|
||||||
gps: GpsData
|
|
||||||
timestamp: datetime
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
@field_validator("timestamp", mode="before")
|
|
||||||
def check_timestamp(cls, value):
|
|
||||||
if isinstance(value, datetime):
|
|
||||||
return value
|
|
||||||
try:
|
|
||||||
return datetime.fromisoformat(value)
|
|
||||||
except (TypeError, ValueError):
|
|
||||||
raise ValueError(
|
|
||||||
"Invalid timestamp format. Expected ISO 8601 format (YYYY-MM-DDTHH:MM:SSZ)."
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class ProcessedAgentData(BaseModel):
|
|
||||||
road_state: str
|
|
||||||
agent_data: AgentData
|
|
||||||
Loading…
x
Reference in New Issue
Block a user