Compare commits
73 Commits
lab1_shmul
...
16b8fe9d56
| Author | SHA1 | Date | |
|---|---|---|---|
| 16b8fe9d56 | |||
| 60a846d8b8 | |||
| fe6bb6ab3a | |||
|
|
30f81ec1ae | ||
| 1b6f47fa0d | |||
| b1e6ad7c94 | |||
|
|
1eddfd966b | ||
| 8af68d6dd9 | |||
| 63aca15824 | |||
| ee509f72a4 | |||
| da9fe69d4e | |||
| 1c856dca0e | |||
|
|
17738d07fe | ||
|
|
6b5831ff1b | ||
|
|
54505db70e | ||
|
|
6f4b3b0ea6 | ||
|
|
948a936a1f | ||
|
|
87facff668 | ||
|
|
294ed5958e | ||
|
|
cbdf81c028 | ||
|
|
a98223cbb0 | ||
|
|
0bd3eaa91d | ||
|
|
9bdb98c19b | ||
|
|
c3b71daaeb | ||
|
|
31c760f4a8 | ||
|
|
24aeb1a19f | ||
|
|
4a81434cb6 | ||
|
|
ceffcfeac2 | ||
|
|
312177e087 | ||
|
|
a52da042ef | ||
|
|
11c590cf25 | ||
|
|
550d29c48c | ||
|
|
8a1327b10a | ||
|
|
db1b7cc6fc | ||
|
|
a899ef6a6e | ||
|
|
95176ea467 | ||
|
|
081a2d4240 | ||
|
|
92c20ef612 | ||
|
|
c31363aa57 | ||
|
|
f96930a259 | ||
|
|
87df394352 | ||
|
|
10ad9774a7 | ||
|
|
b730dbb74c | ||
|
|
e4e585b9ac | ||
|
|
185b0aae58 | ||
|
|
af94c007a2 | ||
|
|
f9ef916331 | ||
|
|
3931fa58c1 | ||
|
|
98fb6aa12a | ||
|
|
7ddfb68b02 | ||
| 9473c5a621 | |||
| 953b0bdb9a | |||
|
|
ea9be3fb57 | ||
|
|
f3512e4afb | ||
| f58596ebf7 | |||
| d621390f51 | |||
| e4be6b0a19 | |||
| fe66df9b8c | |||
|
|
69e679eccf | ||
| 3e0b4762ef | |||
| 75613fd4fc | |||
| a25fbfc3ef | |||
| ca790e7306 | |||
| 1643767094 | |||
| 3d94bf3008 | |||
| c5d98d53cd | |||
| 07a0e906d8 | |||
|
|
9bf3741f32 | ||
| c974ac32f6 | |||
|
|
184098b826 | ||
|
|
b2c7427af0 | ||
| 1e7516fe7b | |||
| a63864bcaa |
23
.dockerignore
Normal file
23
.dockerignore
Normal file
@@ -0,0 +1,23 @@
|
|||||||
|
# IDEs
|
||||||
|
.idea/
|
||||||
|
.vscode/
|
||||||
|
.git/
|
||||||
|
.gitignore
|
||||||
|
.dockerignore
|
||||||
|
.DS_Store
|
||||||
|
Thumbs.db
|
||||||
|
|
||||||
|
# Python
|
||||||
|
**/__pycache__/
|
||||||
|
**/*.py[cod]
|
||||||
|
**/*.pyo
|
||||||
|
**/*.pyd
|
||||||
|
venv/
|
||||||
|
.env
|
||||||
|
|
||||||
|
# Logs & Database & Broker data
|
||||||
|
*.log
|
||||||
|
**/mosquitto/data/
|
||||||
|
**/mosquitto/log/
|
||||||
|
**/postgres_data/
|
||||||
|
**/pgadmin-data/
|
||||||
71
.gitea/workflows/tests.yaml
Normal file
71
.gitea/workflows/tests.yaml
Normal file
@@ -0,0 +1,71 @@
|
|||||||
|
name: Component testing
|
||||||
|
on: [push, workflow_dispatch]
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
hub-test:
|
||||||
|
name: Hub testing
|
||||||
|
runs-on: host-arch-x86_64
|
||||||
|
steps:
|
||||||
|
- name: Clone repository
|
||||||
|
run: git clone --revision ${{ gitea.sha }} --depth 1 ${{ gitea.server_url }}/${{ gitea.repository }}
|
||||||
|
|
||||||
|
- name: Build Hub testing container
|
||||||
|
working-directory: IoT-Systems
|
||||||
|
run: docker build -t local/hub/${{gitea.sha}} -f hub/Dockerfile-test .
|
||||||
|
|
||||||
|
- name: Run Hub tests
|
||||||
|
working-directory: IoT-Systems
|
||||||
|
run: docker run --rm -it local/hub/${{gitea.sha}}
|
||||||
|
|
||||||
|
- name: Clean up containers
|
||||||
|
if: ${{always()}}
|
||||||
|
run: docker image rm local/hub/${{gitea.sha}}
|
||||||
|
|
||||||
|
store-test:
|
||||||
|
name: Store testing
|
||||||
|
runs-on: host-arch-x86_64
|
||||||
|
steps:
|
||||||
|
- name: Clone repository
|
||||||
|
run: git clone --revision ${{ gitea.sha }} --depth 1 ${{ gitea.server_url }}/${{ gitea.repository }}
|
||||||
|
|
||||||
|
- name: Build Store testing container
|
||||||
|
working-directory: IoT-Systems
|
||||||
|
run: docker build -t local/store/${{gitea.sha}} -f store/Dockerfile-test .
|
||||||
|
|
||||||
|
- name: Run Store tests
|
||||||
|
working-directory: IoT-Systems
|
||||||
|
run: docker run --rm -it local/store/${{gitea.sha}}
|
||||||
|
|
||||||
|
- name: Clean up containers
|
||||||
|
if: ${{always()}}
|
||||||
|
run: docker image rm local/store/${{gitea.sha}}
|
||||||
|
|
||||||
|
integration-smoke-test:
|
||||||
|
name: Integration smoke testing
|
||||||
|
runs-on: host-arch-x86_64
|
||||||
|
needs:
|
||||||
|
- hub-test
|
||||||
|
- store-test
|
||||||
|
steps:
|
||||||
|
- name: Clone repository
|
||||||
|
run: git clone --revision ${{ gitea.sha }} --depth 1 ${{ gitea.server_url }}/${{ gitea.repository }}
|
||||||
|
|
||||||
|
#- name: Build all production containers
|
||||||
|
#working-directory: IoT-Systems
|
||||||
|
#run: docker-compose build
|
||||||
|
|
||||||
|
- name: Start all production containers
|
||||||
|
working-directory: IoT-Systems
|
||||||
|
run: docker-compose up -d
|
||||||
|
|
||||||
|
- name: Wait for crashes to happen
|
||||||
|
run: sleep 30
|
||||||
|
|
||||||
|
- name: Check for dead containers
|
||||||
|
working-directory: IoT-Systems
|
||||||
|
run: docker ps -a | python3 utils/check-up.py
|
||||||
|
|
||||||
|
- name: Clean up
|
||||||
|
if: ${{always()}}
|
||||||
|
working-directory: IoT-Systems
|
||||||
|
run: docker-compose down -v
|
||||||
25
.gitignore
vendored
Normal file
25
.gitignore
vendored
Normal file
@@ -0,0 +1,25 @@
|
|||||||
|
# IDEs
|
||||||
|
.idea/
|
||||||
|
.vscode/
|
||||||
|
*.swp
|
||||||
|
*.swo
|
||||||
|
|
||||||
|
# Python
|
||||||
|
venv/
|
||||||
|
__pycache__/
|
||||||
|
*.py[cod]
|
||||||
|
*$py.class
|
||||||
|
.env
|
||||||
|
|
||||||
|
# Logs
|
||||||
|
*.log
|
||||||
|
app.log
|
||||||
|
|
||||||
|
# Database & Broker data
|
||||||
|
**/mosquitto/data/
|
||||||
|
**/mosquitto/log/
|
||||||
|
**/postgres_data/
|
||||||
|
**/pgadmin-data/
|
||||||
|
|
||||||
|
# OS specific
|
||||||
|
.DS_Store
|
||||||
3
MapView/.gitignore
vendored
3
MapView/.gitignore
vendored
@@ -1,3 +0,0 @@
|
|||||||
.idea
|
|
||||||
venv
|
|
||||||
__pycache__
|
|
||||||
1423
MapView/data.csv
1423
MapView/data.csv
File diff suppressed because one or more lines are too long
@@ -75,6 +75,7 @@ class Datasource:
|
|||||||
processed_agent_data.latitude,
|
processed_agent_data.latitude,
|
||||||
processed_agent_data.longitude,
|
processed_agent_data.longitude,
|
||||||
processed_agent_data.road_state,
|
processed_agent_data.road_state,
|
||||||
|
processed_agent_data.user_id
|
||||||
)
|
)
|
||||||
for processed_agent_data in processed_agent_data_list
|
for processed_agent_data in processed_agent_data_list
|
||||||
]
|
]
|
||||||
|
|||||||
7
MapView/domain/accelerometer.py
Normal file
7
MapView/domain/accelerometer.py
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
from dataclasses import dataclass
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Accelerometer:
|
||||||
|
x: int
|
||||||
|
y: int
|
||||||
|
z: int
|
||||||
50
MapView/fileReader.py
Normal file
50
MapView/fileReader.py
Normal file
@@ -0,0 +1,50 @@
|
|||||||
|
from csv import reader
|
||||||
|
import config
|
||||||
|
from domain.accelerometer import Accelerometer
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
class FileReader:
|
||||||
|
def __init__(
|
||||||
|
self, data_filename: str,
|
||||||
|
) -> None:
|
||||||
|
self.file_path = data_filename
|
||||||
|
pass
|
||||||
|
|
||||||
|
def read(self):
|
||||||
|
return self.getNextValue()
|
||||||
|
|
||||||
|
def startReading(self, *args, **kwargs):
|
||||||
|
self.file = open(self.file_path, newline='')
|
||||||
|
self.file_reader = reader(self.file, skipinitialspace=True)
|
||||||
|
file_header = next(self.file_reader)
|
||||||
|
|
||||||
|
self.x_idx = file_header.index('X')
|
||||||
|
self.y_idx = file_header.index('Y')
|
||||||
|
self.z_idx = file_header.index('Z')
|
||||||
|
|
||||||
|
def getNextValue(self):
|
||||||
|
while True:
|
||||||
|
row = next(self.file_reader, None)
|
||||||
|
if row is None:
|
||||||
|
self._rewind_file()
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
x = int(row[self.x_idx])
|
||||||
|
y = int(row[self.y_idx])
|
||||||
|
z = int(row[self.z_idx])
|
||||||
|
return Accelerometer(x=x, y=y, z=z)
|
||||||
|
except Exception as e:
|
||||||
|
continue
|
||||||
|
|
||||||
|
def _rewind_file(self):
|
||||||
|
self.file.seek(0)
|
||||||
|
self.file_reader = reader(self.file)
|
||||||
|
next(self.file_reader)
|
||||||
|
|
||||||
|
def stopReading(self, *args, **kwargs):
|
||||||
|
if self.file:
|
||||||
|
self.file.close()
|
||||||
|
self.file_reader = None
|
||||||
|
|
||||||
|
|
||||||
115
MapView/main.py
115
MapView/main.py
@@ -5,47 +5,140 @@ from kivy.clock import Clock
|
|||||||
from lineMapLayer import LineMapLayer
|
from lineMapLayer import LineMapLayer
|
||||||
from datasource import Datasource
|
from datasource import Datasource
|
||||||
|
|
||||||
|
line_layer_colors = [
|
||||||
|
[1, 0, 0, 1],
|
||||||
|
[1, 0.5, 0, 1],
|
||||||
|
[0, 1, 0, 1],
|
||||||
|
[0, 1, 1, 1],
|
||||||
|
[0, 0, 1, 1],
|
||||||
|
[1, 0, 1, 1],
|
||||||
|
]
|
||||||
|
|
||||||
class MapViewApp(App):
|
class MapViewApp(App):
|
||||||
def __init__(self, **kwargs):
|
def __init__(self, **kwargs):
|
||||||
super().__init__()
|
super().__init__(**kwargs)
|
||||||
|
|
||||||
|
self.mapview = None
|
||||||
|
self.datasource = Datasource(user_id=1)
|
||||||
|
self.line_layers = dict()
|
||||||
|
self.car_markers = dict()
|
||||||
|
|
||||||
# додати необхідні змінні
|
# додати необхідні змінні
|
||||||
|
self.bump_markers = []
|
||||||
|
self.pothole_markers = []
|
||||||
|
|
||||||
def on_start(self):
|
def on_start(self):
|
||||||
"""
|
"""
|
||||||
Встановлює необхідні маркери, викликає функцію для оновлення мапи
|
Встановлює необхідні маркери, викликає функцію для оновлення мапи
|
||||||
"""
|
"""
|
||||||
|
self.update()
|
||||||
|
Clock.schedule_interval(self.update, 5)
|
||||||
|
|
||||||
def update(self, *args):
|
def update(self, *args):
|
||||||
"""
|
"""
|
||||||
Викликається регулярно для оновлення мапи
|
Викликається регулярно для оновлення мапи
|
||||||
"""
|
"""
|
||||||
|
new_points = self.datasource.get_new_points()
|
||||||
|
|
||||||
|
if not new_points:
|
||||||
|
return
|
||||||
|
|
||||||
def update_car_marker(self, point):
|
for point in new_points:
|
||||||
|
|
||||||
|
lat, lon, road_state, user_id = point
|
||||||
|
|
||||||
|
# Оновлює лінію маршрута
|
||||||
|
if user_id not in self.line_layers:
|
||||||
|
self.line_layers[user_id] = LineMapLayer(color = line_layer_colors[user_id % len(line_layer_colors)])
|
||||||
|
self.mapview.add_layer(self.line_layers[user_id])
|
||||||
|
|
||||||
|
self.line_layers[user_id].add_point((lat, lon))
|
||||||
|
|
||||||
|
# Оновлює маркер маниши
|
||||||
|
self.update_car_marker(lat, lon, user_id)
|
||||||
|
|
||||||
|
# Перевіряємо стан дороги
|
||||||
|
self.check_road_quality(point)
|
||||||
|
|
||||||
|
def check_road_quality(self, point):
|
||||||
|
"""
|
||||||
|
Аналізує дані акселерометра для подальшого визначення
|
||||||
|
та відображення ям та лежачих поліцейських
|
||||||
|
"""
|
||||||
|
if len(point) < 3:
|
||||||
|
return
|
||||||
|
|
||||||
|
lat, lon, road_state, user_id = point
|
||||||
|
|
||||||
|
if road_state == "pothole":
|
||||||
|
self.set_pothole_marker((lat, lon))
|
||||||
|
elif road_state == "bump":
|
||||||
|
self.set_bump_marker((lat, lon))
|
||||||
|
|
||||||
|
def update_car_marker(self, lat, lon, user_id):
|
||||||
"""
|
"""
|
||||||
Оновлює відображення маркера машини на мапі
|
Оновлює відображення маркера машини на мапі
|
||||||
:param point: GPS координати
|
:param point: GPS координати
|
||||||
"""
|
"""
|
||||||
|
if user_id not in self.car_markers:
|
||||||
|
self.car_markers[user_id] = MapMarker(lat=lat, lon=lon, source='./images/car.png')
|
||||||
|
self.mapview.add_marker(self.car_markers[user_id])
|
||||||
|
else:
|
||||||
|
self.car_markers[user_id].lat = lat
|
||||||
|
self.car_markers[user_id].lon = lon
|
||||||
|
|
||||||
|
self.mapview.center_on(lat, lon)
|
||||||
|
|
||||||
def set_pothole_marker(self, point):
|
def set_pothole_marker(self, point):
|
||||||
"""
|
if isinstance(point, dict):
|
||||||
Встановлює маркер для ями
|
lat = point.get("lat")
|
||||||
:param point: GPS координати
|
lon = point.get("lon")
|
||||||
"""
|
else:
|
||||||
|
lat, lon = point
|
||||||
|
|
||||||
|
if lat is None or lon is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
marker = MapMarker(
|
||||||
|
lat=lat,
|
||||||
|
lon=lon,
|
||||||
|
source="images/pothole.png"
|
||||||
|
)
|
||||||
|
|
||||||
|
self.mapview.add_marker(marker)
|
||||||
|
self.pothole_markers.append(marker)
|
||||||
|
|
||||||
def set_bump_marker(self, point):
|
def set_bump_marker(self, point):
|
||||||
"""
|
if isinstance(point, dict):
|
||||||
Встановлює маркер для лежачого поліцейського
|
lat = point.get("lat")
|
||||||
:param point: GPS координати
|
lon = point.get("lon")
|
||||||
"""
|
else:
|
||||||
|
lat, lon = point
|
||||||
|
|
||||||
|
if lat is None or lon is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
marker = MapMarker(
|
||||||
|
lat=lat,
|
||||||
|
lon=lon,
|
||||||
|
source="images/bump.png"
|
||||||
|
)
|
||||||
|
|
||||||
|
self.mapview.add_marker(marker)
|
||||||
|
self.bump_markers.append(marker)
|
||||||
|
|
||||||
|
|
||||||
def build(self):
|
def build(self):
|
||||||
"""
|
"""
|
||||||
Ініціалізує мапу MapView(zoom, lat, lon)
|
Ініціалізує мапу MapView(zoom, lat, lon)
|
||||||
:return: мапу
|
:return: мапу
|
||||||
"""
|
"""
|
||||||
self.mapview = MapView()
|
self.mapview = MapView(
|
||||||
|
zoom=15,
|
||||||
|
lat=50.4501,
|
||||||
|
lon=30.5234
|
||||||
|
)
|
||||||
|
|
||||||
return self.mapview
|
return self.mapview
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
2
agent/.gitignore
vendored
2
agent/.gitignore
vendored
@@ -1,2 +0,0 @@
|
|||||||
venv
|
|
||||||
__pycache__
|
|
||||||
@@ -3,10 +3,10 @@ FROM python:latest
|
|||||||
# set the working directory in the container
|
# set the working directory in the container
|
||||||
WORKDIR /usr/agent
|
WORKDIR /usr/agent
|
||||||
# copy the dependencies file to the working directory
|
# copy the dependencies file to the working directory
|
||||||
COPY requirements.txt .
|
COPY agent/requirements.txt .
|
||||||
# install dependencies
|
# install dependencies
|
||||||
RUN pip install -r requirements.txt
|
RUN pip install --no-cache-dir -r requirements.txt
|
||||||
# copy the content of the local src directory to the working directory
|
# copy the content of the local src directory to the working directory
|
||||||
COPY src/ .
|
COPY agent/src/ .
|
||||||
# command to run on container start
|
# command to run on container start
|
||||||
CMD ["python", "main.py"]
|
CMD ["python", "main.py"]
|
||||||
@@ -1,32 +0,0 @@
|
|||||||
name: "road_vision"
|
|
||||||
services:
|
|
||||||
mqtt:
|
|
||||||
image: eclipse-mosquitto
|
|
||||||
container_name: mqtt
|
|
||||||
volumes:
|
|
||||||
- ./mosquitto:/mosquitto
|
|
||||||
- ./mosquitto/data:/mosquitto/data
|
|
||||||
- ./mosquitto/log:/mosquitto/log
|
|
||||||
ports:
|
|
||||||
- 1883:1883
|
|
||||||
- 9001:9001
|
|
||||||
networks:
|
|
||||||
mqtt_network:
|
|
||||||
|
|
||||||
|
|
||||||
fake_agent:
|
|
||||||
container_name: agent
|
|
||||||
build: ../
|
|
||||||
depends_on:
|
|
||||||
- mqtt
|
|
||||||
environment:
|
|
||||||
MQTT_BROKER_HOST: "mqtt"
|
|
||||||
MQTT_BROKER_PORT: 1883
|
|
||||||
MQTT_TOPIC: "agent_data_topic"
|
|
||||||
DELAY: 0.1
|
|
||||||
networks:
|
|
||||||
mqtt_network:
|
|
||||||
|
|
||||||
|
|
||||||
networks:
|
|
||||||
mqtt_network:
|
|
||||||
0
agent/src/__init__.py
Normal file
0
agent/src/__init__.py
Normal file
22
agent/src/data/parking.csv
Normal file
22
agent/src/data/parking.csv
Normal file
@@ -0,0 +1,22 @@
|
|||||||
|
longitude,latitude,empty_count
|
||||||
|
50.450386085935094,30.524547100067142,10
|
||||||
|
50.450386085935094,30.524547100067142,11
|
||||||
|
50.450386085935094,30.524547100067142,13
|
||||||
|
50.450386085935094,30.524547100067142,15
|
||||||
|
50.450386085935094,30.524547100067142,7
|
||||||
|
50.450386085935094,30.524547100067142,9
|
||||||
|
50.450386085935094,30.524547100067142,4
|
||||||
|
50.450386085935094,30.524547100067142,0
|
||||||
|
50.450386085935094,30.524547100067142,0
|
||||||
|
50.450386085935094,30.524547100067142,3
|
||||||
|
50.450386085935094,30.524547100067142,4
|
||||||
|
50.450069433207545,30.52406822530458,16
|
||||||
|
50.450069433207545,30.52406822530458,20
|
||||||
|
50.450069433207545,30.52406822530458,25
|
||||||
|
50.450069433207545,30.52406822530458,30
|
||||||
|
50.450069433207545,30.52406822530458,29
|
||||||
|
50.450069433207545,30.52406822530458,12
|
||||||
|
50.450069433207545,30.52406822530458,10
|
||||||
|
50.450069433207545,30.52406822530458,14
|
||||||
|
50.450069433207545,30.52406822530458,3
|
||||||
|
50.450069433207545,30.52406822530458,2
|
||||||
|
@@ -1,13 +1,16 @@
|
|||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
|
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
|
||||||
from domain.accelerometer import Accelerometer
|
from domain.accelerometer import Accelerometer
|
||||||
from domain.gps import Gps
|
from domain.gps import Gps
|
||||||
|
from domain.parking import Parking
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class AggregatedData:
|
class AggregatedData:
|
||||||
accelerometer: Accelerometer
|
accelerometer: Accelerometer
|
||||||
gps: Gps
|
gps: Gps
|
||||||
|
parking: Parking
|
||||||
timestamp: datetime
|
timestamp: datetime
|
||||||
user_id: int
|
user_id: int
|
||||||
|
|||||||
9
agent/src/domain/parking.py
Normal file
9
agent/src/domain/parking.py
Normal file
@@ -0,0 +1,9 @@
|
|||||||
|
from dataclasses import dataclass
|
||||||
|
|
||||||
|
from domain.gps import Gps
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Parking:
|
||||||
|
empty_count: int
|
||||||
|
gps: Gps
|
||||||
@@ -1,5 +1,10 @@
|
|||||||
from csv import reader
|
import csv
|
||||||
|
import time
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Optional, List
|
||||||
|
|
||||||
|
from domain.parking import Parking
|
||||||
from domain.accelerometer import Accelerometer
|
from domain.accelerometer import Accelerometer
|
||||||
from domain.gps import Gps
|
from domain.gps import Gps
|
||||||
from domain.aggregated_data import AggregatedData
|
from domain.aggregated_data import AggregatedData
|
||||||
@@ -7,24 +12,185 @@ import config
|
|||||||
|
|
||||||
|
|
||||||
class FileDatasource:
|
class FileDatasource:
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
accelerometer_filename: str,
|
accelerometer_filename: str,
|
||||||
gps_filename: str,
|
gps_filename: str,
|
||||||
|
park_filename: str,
|
||||||
) -> None:
|
) -> None:
|
||||||
pass
|
|
||||||
|
|
||||||
def read(self) -> AggregatedData:
|
self.accelerometer_filename = accelerometer_filename
|
||||||
"""Метод повертає дані отримані з датчиків"""
|
self.park_filename = park_filename
|
||||||
return AggregatedData(
|
self.gps_filename = gps_filename
|
||||||
Accelerometer(1, 2, 3),
|
|
||||||
Gps(4, 5),
|
self._park_f = None
|
||||||
datetime.now(),
|
self._acc_f = None
|
||||||
config.USER_ID,
|
self._gps_f = None
|
||||||
)
|
|
||||||
|
self._park_reader: Optional[csv.reader] = None
|
||||||
|
self._acc_reader: Optional[csv.reader] = None
|
||||||
|
self._gps_reader: Optional[csv.reader] = None
|
||||||
|
|
||||||
|
self._started = False
|
||||||
|
|
||||||
def startReading(self, *args, **kwargs):
|
def startReading(self, *args, **kwargs):
|
||||||
"""Метод повинен викликатись перед початком читання даних"""
|
"""Must be called before read()"""
|
||||||
|
if self._started:
|
||||||
|
return
|
||||||
|
|
||||||
|
if not Path(self.accelerometer_filename).exists():
|
||||||
|
raise FileNotFoundError(f"Accelerometer file not found: {self.accelerometer_filename}")
|
||||||
|
if not Path(self.park_filename).exists():
|
||||||
|
raise FileNotFoundError(f"Accelerometer file not found: {self.park_filename}")
|
||||||
|
if not Path(self.gps_filename).exists():
|
||||||
|
raise FileNotFoundError(f"GPS file not found: {self.gps_filename}")
|
||||||
|
|
||||||
|
self._open_files()
|
||||||
|
self._started = True
|
||||||
|
|
||||||
def stopReading(self, *args, **kwargs):
|
def stopReading(self, *args, **kwargs):
|
||||||
"""Метод повинен викликатись для закінчення читання даних"""
|
"""Must be called when finishing reading"""
|
||||||
|
self._close_files()
|
||||||
|
self._started = False
|
||||||
|
|
||||||
|
def read(self) -> AggregatedData:
|
||||||
|
"""Return one combined sample (acc + gps)."""
|
||||||
|
if not self._started:
|
||||||
|
raise RuntimeError("Datasource is not started. Call startReading() before read().")
|
||||||
|
|
||||||
|
acc_row = self._get_next_row(self._acc_reader, source="acc")
|
||||||
|
park_row = self._get_next_row(self._park_reader, source="park")
|
||||||
|
gps_row = self._get_next_row(self._gps_reader, source="gps")
|
||||||
|
|
||||||
|
acc = self._parse_acc(acc_row)
|
||||||
|
park = self._parse_park(park_row)
|
||||||
|
gps = self._parse_gps(gps_row)
|
||||||
|
|
||||||
|
# IMPORTANT: timing belongs to datasource (not MQTT / main.py)
|
||||||
|
if config.DELAY and config.DELAY > 0:
|
||||||
|
time.sleep(float(config.DELAY))
|
||||||
|
|
||||||
|
return AggregatedData(
|
||||||
|
accelerometer=acc,
|
||||||
|
gps=gps,
|
||||||
|
parking=park,
|
||||||
|
timestamp=datetime.utcnow(),
|
||||||
|
user_id=config.USER_ID,
|
||||||
|
)
|
||||||
|
|
||||||
|
# ---------------- internal ----------------
|
||||||
|
|
||||||
|
def _open_files(self) -> None:
|
||||||
|
self._close_files()
|
||||||
|
|
||||||
|
self._acc_f = open(self.accelerometer_filename, "r", newline="", encoding="utf-8")
|
||||||
|
self._park_f = open(self.park_filename, "r", newline="", encoding="utf-8")
|
||||||
|
self._gps_f = open(self.gps_filename, "r", newline="", encoding="utf-8")
|
||||||
|
|
||||||
|
self._acc_reader = csv.reader(self._acc_f, skipinitialspace=True)
|
||||||
|
self._park_reader = csv.reader(self._park_f, skipinitialspace=True)
|
||||||
|
self._gps_reader = csv.reader(self._gps_f, skipinitialspace=True)
|
||||||
|
|
||||||
|
# File pointer is already at 0 right after open(), so no need to rewind here.
|
||||||
|
# Skip header row once.
|
||||||
|
next(self._acc_reader, None)
|
||||||
|
next(self._park_reader, None)
|
||||||
|
next(self._gps_reader, None)
|
||||||
|
|
||||||
|
def _close_files(self) -> None:
|
||||||
|
for f in (self._acc_f, self._gps_f):
|
||||||
|
try:
|
||||||
|
if f is not None:
|
||||||
|
f.close()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
self._acc_f = None
|
||||||
|
self._park_f = None
|
||||||
|
self._gps_f = None
|
||||||
|
self._acc_reader = None
|
||||||
|
self._park_reader = None
|
||||||
|
self._gps_reader = None
|
||||||
|
|
||||||
|
def _rewind_acc(self) -> None:
|
||||||
|
if self._acc_f is None:
|
||||||
|
raise RuntimeError("Accelerometer file is not open.")
|
||||||
|
self._acc_f.seek(0)
|
||||||
|
self._acc_reader = csv.reader(self._acc_f, skipinitialspace=True)
|
||||||
|
next(self._acc_reader, None) # skip header row
|
||||||
|
|
||||||
|
def _rewind_gps(self) -> None:
|
||||||
|
if self._gps_f is None:
|
||||||
|
raise RuntimeError("GPS file is not open.")
|
||||||
|
self._gps_f.seek(0)
|
||||||
|
self._gps_reader = csv.reader(self._gps_f, skipinitialspace=True)
|
||||||
|
next(self._gps_reader, None) # skip header row
|
||||||
|
|
||||||
|
def _rewind_park(self) -> None:
|
||||||
|
if self._park_f is None:
|
||||||
|
raise RuntimeError("GPS file is not open.")
|
||||||
|
self._park_f.seek(0)
|
||||||
|
self._park_reader = csv.reader(self._park_f, skipinitialspace=True)
|
||||||
|
next(self._park_reader, None) # skip header row
|
||||||
|
|
||||||
|
def _get_next_row(self, reader, source: str) -> List[str]:
|
||||||
|
"""Get the next valid row from the reader."""
|
||||||
|
if reader is None:
|
||||||
|
raise RuntimeError("Reader is not initialized.")
|
||||||
|
|
||||||
|
while True:
|
||||||
|
row = next(reader, None)
|
||||||
|
if row is None:
|
||||||
|
# EOF -> rewind & continue
|
||||||
|
if source == "acc":
|
||||||
|
self._rewind_acc()
|
||||||
|
reader = self._acc_reader
|
||||||
|
|
||||||
|
elif source == 'park':
|
||||||
|
self._rewind_park()
|
||||||
|
reader = self._park_reader
|
||||||
|
else:
|
||||||
|
self._rewind_gps()
|
||||||
|
reader = self._gps_reader
|
||||||
|
continue
|
||||||
|
|
||||||
|
if not row or not any(cell.strip() for cell in row):
|
||||||
|
continue
|
||||||
|
|
||||||
|
return row
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _parse_acc(row: List[str]) -> Accelerometer:
|
||||||
|
if len(row) < 3:
|
||||||
|
raise ValueError(f"Accelerometer row must have 3 values (x,y,z). Got: {row}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
x = int(row[0])
|
||||||
|
y = int(row[1])
|
||||||
|
z = int(row[2])
|
||||||
|
except ValueError as e:
|
||||||
|
raise ValueError(f"Invalid accelerometer values (expected integers): {row}") from e
|
||||||
|
|
||||||
|
return Accelerometer(x=x, y=y, z=z)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _parse_gps(row: List[str]) -> Gps:
|
||||||
|
if len(row) < 2:
|
||||||
|
raise ValueError(f"GPS row must have 2 values (longitude,latitude). Got: {row}")
|
||||||
|
lon = float(row[0])
|
||||||
|
lat = float(row[1])
|
||||||
|
return Gps(longitude=lon, latitude=lat)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _parse_park(row: List[str]) -> Parking:
|
||||||
|
if len(row) < 2:
|
||||||
|
raise ValueError(f"GPS row must have 2 values (longitude,latitude). Got: {row}")
|
||||||
|
lon = float(row[0])
|
||||||
|
lat = float(row[1])
|
||||||
|
empty_count = int(row[2])
|
||||||
|
|
||||||
|
return Parking(
|
||||||
|
gps=Gps(longitude=lon, latitude=lat),
|
||||||
|
empty_count=empty_count
|
||||||
|
)
|
||||||
|
|||||||
@@ -1,6 +1,4 @@
|
|||||||
from paho.mqtt import client as mqtt_client
|
from paho.mqtt import client as mqtt_client
|
||||||
import json
|
|
||||||
import time
|
|
||||||
from schema.aggregated_data_schema import AggregatedDataSchema
|
from schema.aggregated_data_schema import AggregatedDataSchema
|
||||||
from file_datasource import FileDatasource
|
from file_datasource import FileDatasource
|
||||||
import config
|
import config
|
||||||
@@ -24,19 +22,14 @@ def connect_mqtt(broker, port):
|
|||||||
return client
|
return client
|
||||||
|
|
||||||
|
|
||||||
def publish(client, topic, datasource, delay):
|
def publish(client, topic, datasource):
|
||||||
datasource.startReading()
|
datasource.startReading()
|
||||||
while True:
|
while True:
|
||||||
time.sleep(delay)
|
|
||||||
data = datasource.read()
|
data = datasource.read()
|
||||||
msg = AggregatedDataSchema().dumps(data)
|
msg = AggregatedDataSchema().dumps(data)
|
||||||
result = client.publish(topic, msg)
|
result = client.publish(topic, msg)
|
||||||
# result: [0, 1]
|
|
||||||
status = result[0]
|
status = result[0]
|
||||||
if status == 0:
|
if status != 0:
|
||||||
pass
|
|
||||||
# print(f"Send `{msg}` to topic `{topic}`")
|
|
||||||
else:
|
|
||||||
print(f"Failed to send message to topic {topic}")
|
print(f"Failed to send message to topic {topic}")
|
||||||
|
|
||||||
|
|
||||||
@@ -44,9 +37,9 @@ def run():
|
|||||||
# Prepare mqtt client
|
# Prepare mqtt client
|
||||||
client = connect_mqtt(config.MQTT_BROKER_HOST, config.MQTT_BROKER_PORT)
|
client = connect_mqtt(config.MQTT_BROKER_HOST, config.MQTT_BROKER_PORT)
|
||||||
# Prepare datasource
|
# Prepare datasource
|
||||||
datasource = FileDatasource("data/data.csv", "data/gps_data.csv")
|
datasource = FileDatasource("data/accelerometer.csv", "data/gps.csv", "data/parking.csv")
|
||||||
# Infinity publish data
|
# Infinity publish data
|
||||||
publish(client, config.MQTT_TOPIC, datasource, config.DELAY)
|
publish(client, config.MQTT_TOPIC, datasource)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|||||||
@@ -1,10 +1,12 @@
|
|||||||
from marshmallow import Schema, fields
|
from marshmallow import Schema, fields
|
||||||
from schema.accelerometer_schema import AccelerometerSchema
|
from schema.accelerometer_schema import AccelerometerSchema
|
||||||
from schema.gps_schema import GpsSchema
|
from schema.gps_schema import GpsSchema
|
||||||
|
from schema.parking_schema import ParkingSchema
|
||||||
|
|
||||||
|
|
||||||
class AggregatedDataSchema(Schema):
|
class AggregatedDataSchema(Schema):
|
||||||
accelerometer = fields.Nested(AccelerometerSchema)
|
accelerometer = fields.Nested(AccelerometerSchema)
|
||||||
gps = fields.Nested(GpsSchema)
|
gps = fields.Nested(GpsSchema)
|
||||||
|
parking = fields.Nested(ParkingSchema)
|
||||||
timestamp = fields.DateTime("iso")
|
timestamp = fields.DateTime("iso")
|
||||||
user_id = fields.Int()
|
user_id = fields.Int()
|
||||||
|
|||||||
8
agent/src/schema/parking_schema.py
Normal file
8
agent/src/schema/parking_schema.py
Normal file
@@ -0,0 +1,8 @@
|
|||||||
|
from marshmallow import Schema, fields
|
||||||
|
|
||||||
|
from schema.gps_schema import GpsSchema
|
||||||
|
|
||||||
|
|
||||||
|
class ParkingSchema(Schema):
|
||||||
|
gps = fields.Nested(GpsSchema)
|
||||||
|
empty_count = fields.Int()
|
||||||
@@ -1,13 +1,12 @@
|
|||||||
version: "3.9"
|
name: "road_vision"
|
||||||
name: "road_vision__hub"
|
|
||||||
services:
|
services:
|
||||||
mqtt:
|
mqtt:
|
||||||
image: eclipse-mosquitto
|
image: eclipse-mosquitto
|
||||||
container_name: mqtt
|
container_name: mqtt
|
||||||
volumes:
|
volumes:
|
||||||
- ./mosquitto:/mosquitto
|
- ./agent/docker/mosquitto:/mosquitto
|
||||||
- ./mosquitto/data:/mosquitto/data
|
- ./agent/docker/mosquitto/data:/mosquitto/data
|
||||||
- ./mosquitto/log:/mosquitto/log
|
- ./agent/docker/mosquitto/log:/mosquitto/log
|
||||||
ports:
|
ports:
|
||||||
- 1883:1883
|
- 1883:1883
|
||||||
- 9001:9001
|
- 9001:9001
|
||||||
@@ -15,8 +14,43 @@ services:
|
|||||||
mqtt_network:
|
mqtt_network:
|
||||||
|
|
||||||
|
|
||||||
|
fake_agent:
|
||||||
|
container_name: agent
|
||||||
|
build:
|
||||||
|
context: .
|
||||||
|
dockerfile: agent/Dockerfile
|
||||||
|
depends_on:
|
||||||
|
- mqtt
|
||||||
|
environment:
|
||||||
|
MQTT_BROKER_HOST: "mqtt"
|
||||||
|
MQTT_BROKER_PORT: 1883
|
||||||
|
MQTT_TOPIC: "agent_data_topic"
|
||||||
|
DELAY: 0.1
|
||||||
|
networks:
|
||||||
|
mqtt_network:
|
||||||
|
|
||||||
|
edge:
|
||||||
|
container_name: edge
|
||||||
|
build:
|
||||||
|
context: .
|
||||||
|
dockerfile: edge/Dockerfile
|
||||||
|
depends_on:
|
||||||
|
- mqtt
|
||||||
|
environment:
|
||||||
|
MQTT_BROKER_HOST: "mqtt"
|
||||||
|
MQTT_BROKER_PORT: 1883
|
||||||
|
MQTT_TOPIC: " "
|
||||||
|
HUB_HOST: "store"
|
||||||
|
HUB_PORT: 8000
|
||||||
|
HUB_MQTT_BROKER_HOST: "mqtt"
|
||||||
|
HUB_MQTT_BROKER_PORT: 1883
|
||||||
|
HUB_MQTT_TOPIC: "processed_data_topic"
|
||||||
|
networks:
|
||||||
|
mqtt_network:
|
||||||
|
edge_hub:
|
||||||
|
|
||||||
postgres_db:
|
postgres_db:
|
||||||
image: postgres:latest
|
image: postgres:17
|
||||||
container_name: postgres_db
|
container_name: postgres_db
|
||||||
restart: always
|
restart: always
|
||||||
environment:
|
environment:
|
||||||
@@ -25,13 +59,12 @@ services:
|
|||||||
POSTGRES_DB: test_db
|
POSTGRES_DB: test_db
|
||||||
volumes:
|
volumes:
|
||||||
- postgres_data:/var/lib/postgresql/data
|
- postgres_data:/var/lib/postgresql/data
|
||||||
- ./db/structure.sql:/docker-entrypoint-initdb.d/structure.sql
|
- ./store/docker/db/structure.sql:/docker-entrypoint-initdb.d/structure.sql
|
||||||
ports:
|
ports:
|
||||||
- "5432:5432"
|
- "5432:5432"
|
||||||
networks:
|
networks:
|
||||||
db_network:
|
db_network:
|
||||||
|
|
||||||
|
|
||||||
pgadmin:
|
pgadmin:
|
||||||
container_name: pgadmin4
|
container_name: pgadmin4
|
||||||
image: dpage/pgadmin4
|
image: dpage/pgadmin4
|
||||||
@@ -49,7 +82,9 @@ services:
|
|||||||
|
|
||||||
store:
|
store:
|
||||||
container_name: store
|
container_name: store
|
||||||
build: ../../store
|
build:
|
||||||
|
context: .
|
||||||
|
dockerfile: store/Dockerfile
|
||||||
depends_on:
|
depends_on:
|
||||||
- postgres_db
|
- postgres_db
|
||||||
restart: always
|
restart: always
|
||||||
@@ -77,7 +112,9 @@ services:
|
|||||||
|
|
||||||
hub:
|
hub:
|
||||||
container_name: hub
|
container_name: hub
|
||||||
build: ../
|
build:
|
||||||
|
context: .
|
||||||
|
dockerfile: hub/Dockerfile
|
||||||
depends_on:
|
depends_on:
|
||||||
- mqtt
|
- mqtt
|
||||||
- redis
|
- redis
|
||||||
@@ -90,7 +127,7 @@ services:
|
|||||||
MQTT_BROKER_HOST: "mqtt"
|
MQTT_BROKER_HOST: "mqtt"
|
||||||
MQTT_BROKER_PORT: 1883
|
MQTT_BROKER_PORT: 1883
|
||||||
MQTT_TOPIC: "processed_data_topic"
|
MQTT_TOPIC: "processed_data_topic"
|
||||||
BATCH_SIZE: 1
|
BATCH_SIZE: 20
|
||||||
ports:
|
ports:
|
||||||
- "9000:8000"
|
- "9000:8000"
|
||||||
networks:
|
networks:
|
||||||
@@ -98,10 +135,11 @@ services:
|
|||||||
hub_store:
|
hub_store:
|
||||||
hub_redis:
|
hub_redis:
|
||||||
|
|
||||||
|
|
||||||
networks:
|
networks:
|
||||||
mqtt_network:
|
mqtt_network:
|
||||||
db_network:
|
db_network:
|
||||||
|
edge_hub:
|
||||||
|
hub:
|
||||||
hub_store:
|
hub_store:
|
||||||
hub_redis:
|
hub_redis:
|
||||||
|
|
||||||
2
edge/.gitignore
vendored
2
edge/.gitignore
vendored
@@ -1,2 +0,0 @@
|
|||||||
venv
|
|
||||||
app.log
|
|
||||||
@@ -3,9 +3,9 @@ FROM python:3.9-slim
|
|||||||
# Set the working directory inside the container
|
# Set the working directory inside the container
|
||||||
WORKDIR /app
|
WORKDIR /app
|
||||||
# Copy the requirements.txt file and install dependencies
|
# Copy the requirements.txt file and install dependencies
|
||||||
COPY requirements.txt .
|
COPY edge/requirements.txt .
|
||||||
RUN pip install --no-cache-dir -r requirements.txt
|
RUN pip install --no-cache-dir -r requirements.txt
|
||||||
# Copy the entire application into the container
|
# Copy the entire application into the container
|
||||||
COPY . .
|
COPY edge/. .
|
||||||
# Run the main.py script inside the container when it starts
|
# Run the main.py script inside the container when it starts
|
||||||
CMD ["python", "main.py"]
|
CMD ["python", "main.py"]
|
||||||
@@ -1,48 +0,0 @@
|
|||||||
version: "3.9"
|
|
||||||
# name: "road_vision"
|
|
||||||
services:
|
|
||||||
mqtt:
|
|
||||||
image: eclipse-mosquitto
|
|
||||||
container_name: mqtt
|
|
||||||
volumes:
|
|
||||||
- ./mosquitto:/mosquitto
|
|
||||||
- ./mosquitto/data:/mosquitto/data
|
|
||||||
- ./mosquitto/log:/mosquitto/log
|
|
||||||
ports:
|
|
||||||
- 1883:1883
|
|
||||||
- 19001:9001
|
|
||||||
networks:
|
|
||||||
mqtt_network:
|
|
||||||
|
|
||||||
|
|
||||||
edge:
|
|
||||||
container_name: edge
|
|
||||||
build: ../
|
|
||||||
depends_on:
|
|
||||||
- mqtt
|
|
||||||
environment:
|
|
||||||
MQTT_BROKER_HOST: "mqtt"
|
|
||||||
MQTT_BROKER_PORT: 1883
|
|
||||||
MQTT_TOPIC: " "
|
|
||||||
HUB_HOST: "store"
|
|
||||||
HUB_PORT: 8000
|
|
||||||
HUB_MQTT_BROKER_HOST: "mqtt"
|
|
||||||
HUB_MQTT_BROKER_PORT: 1883
|
|
||||||
HUB_MQTT_TOPIC: "processed_data_topic"
|
|
||||||
networks:
|
|
||||||
mqtt_network:
|
|
||||||
edge_hub:
|
|
||||||
|
|
||||||
|
|
||||||
networks:
|
|
||||||
mqtt_network:
|
|
||||||
db_network:
|
|
||||||
edge_hub:
|
|
||||||
hub:
|
|
||||||
hub_store:
|
|
||||||
hub_redis:
|
|
||||||
|
|
||||||
|
|
||||||
volumes:
|
|
||||||
postgres_data:
|
|
||||||
pgadmin-data:
|
|
||||||
2
hub/.gitignore
vendored
2
hub/.gitignore
vendored
@@ -1,2 +0,0 @@
|
|||||||
venv
|
|
||||||
__pycache__
|
|
||||||
@@ -3,9 +3,10 @@ FROM python:3.9-slim
|
|||||||
# Set the working directory inside the container
|
# Set the working directory inside the container
|
||||||
WORKDIR /app
|
WORKDIR /app
|
||||||
# Copy the requirements.txt file and install dependencies
|
# Copy the requirements.txt file and install dependencies
|
||||||
COPY requirements.txt .
|
COPY hub/requirements.txt .
|
||||||
|
|
||||||
RUN pip install --no-cache-dir -r requirements.txt
|
RUN pip install --no-cache-dir -r requirements.txt
|
||||||
# Copy the entire application into the container
|
# Copy the entire application into the container
|
||||||
COPY . .
|
COPY hub/. .
|
||||||
# Run the main.py script inside the container when it starts
|
# Run the main.py script inside the container when it starts
|
||||||
CMD ["uvicorn", "main:app", "--host", "0.0.0.0"]
|
CMD ["uvicorn", "main:app", "--host", "0.0.0.0"]
|
||||||
12
hub/Dockerfile-test
Normal file
12
hub/Dockerfile-test
Normal file
@@ -0,0 +1,12 @@
|
|||||||
|
# Use the official Python image as the base image
|
||||||
|
FROM python:3.9-slim
|
||||||
|
# Set the working directory inside the container
|
||||||
|
WORKDIR /app
|
||||||
|
# Copy the requirements.txt file and install dependencies
|
||||||
|
COPY hub/requirements.txt .
|
||||||
|
|
||||||
|
RUN pip install --no-cache-dir -r requirements.txt
|
||||||
|
# Copy the entire application into the container
|
||||||
|
COPY hub/. .
|
||||||
|
# Run the main.py script inside the container when it starts
|
||||||
|
CMD ["./test-entry.sh"]
|
||||||
@@ -13,12 +13,39 @@ class StoreApiAdapter(StoreGateway):
|
|||||||
def __init__(self, api_base_url):
|
def __init__(self, api_base_url):
|
||||||
self.api_base_url = api_base_url
|
self.api_base_url = api_base_url
|
||||||
|
|
||||||
|
def processed_agent_data_batch_to_payload(self, processed_agent_data_batch: List[ProcessedAgentData]):
|
||||||
|
if not processed_agent_data_batch:
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Extract user_id from the first element
|
||||||
|
user_id = processed_agent_data_batch[0].agent_data.user_id
|
||||||
|
|
||||||
|
payload = {
|
||||||
|
"data": [item.model_dump(mode='json') for item in processed_agent_data_batch],
|
||||||
|
"user_id": user_id
|
||||||
|
}
|
||||||
|
|
||||||
|
return payload
|
||||||
|
|
||||||
def save_data(self, processed_agent_data_batch: List[ProcessedAgentData]):
|
def save_data(self, processed_agent_data_batch: List[ProcessedAgentData]):
|
||||||
"""
|
payload = self.processed_agent_data_batch_to_payload(processed_agent_data_batch)
|
||||||
Save the processed road data to the Store API.
|
|
||||||
Parameters:
|
if payload == False:
|
||||||
processed_agent_data_batch (dict): Processed road data to be saved.
|
return False
|
||||||
Returns:
|
|
||||||
bool: True if the data is successfully saved, False otherwise.
|
try:
|
||||||
"""
|
# Perform a POST request to the Store API with a 10-second timeout
|
||||||
# Implement it
|
response = requests.post(
|
||||||
|
f"{self.api_base_url}/processed_agent_data/",
|
||||||
|
json=payload,
|
||||||
|
timeout=10
|
||||||
|
)
|
||||||
|
if response.status_code == 200:
|
||||||
|
logging.info(f"Batch of {len(processed_agent_data_batch)} items sent to Store.")
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
logging.error(f"Store API error: {response.status_code} - {response.text}")
|
||||||
|
return False
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f"Failed to send data to Store: {e}")
|
||||||
|
return False
|
||||||
|
|||||||
41
hub/app/adapters/store_api_adapter_test.py
Normal file
41
hub/app/adapters/store_api_adapter_test.py
Normal file
@@ -0,0 +1,41 @@
|
|||||||
|
from app.adapters.store_api_adapter import StoreApiAdapter
|
||||||
|
from app.entities.agent_data import AccelerometerData, AgentData, GpsData
|
||||||
|
from app.entities.processed_agent_data import ProcessedAgentData
|
||||||
|
|
||||||
|
def _test_processed_agent_data_batch_to_payload():
|
||||||
|
processed_data_batch = [
|
||||||
|
ProcessedAgentData(road_state = "normal",
|
||||||
|
agent_data = AgentData(user_id = 1,
|
||||||
|
accelerometer = AccelerometerData(x = 0.1, y = 0.2, z = 0.3),
|
||||||
|
gps = GpsData(latitude = 10.123, longitude = 20.456),
|
||||||
|
timestamp = "2023-07-21T12:34:56Z")
|
||||||
|
),
|
||||||
|
ProcessedAgentData(road_state = "normal",
|
||||||
|
agent_data = AgentData(user_id = 2,
|
||||||
|
accelerometer = AccelerometerData(x = 0.1, y = 0.2, z = 0.3),
|
||||||
|
gps = GpsData(latitude = 10.123, longitude = 20.456),
|
||||||
|
timestamp = "2023-07-21T12:34:56Z")
|
||||||
|
),
|
||||||
|
ProcessedAgentData(road_state = "normal",
|
||||||
|
agent_data = AgentData(user_id = 3,
|
||||||
|
accelerometer = AccelerometerData(x = 0.1, y = 0.2, z = 0.3),
|
||||||
|
gps = GpsData(latitude = 10.123, longitude = 20.456),
|
||||||
|
timestamp = "2023-07-21T12:34:56Z")
|
||||||
|
),
|
||||||
|
]
|
||||||
|
|
||||||
|
res = StoreApiAdapter(None).processed_agent_data_batch_to_payload(processed_data_batch)
|
||||||
|
|
||||||
|
assert res["data"][0]["agent_data"]["user_id"] == 1
|
||||||
|
assert res["data"][1]["agent_data"]["user_id"] == 2
|
||||||
|
assert res["data"][2]["agent_data"]["user_id"] == 3
|
||||||
|
|
||||||
|
assert StoreApiAdapter(None).processed_agent_data_batch_to_payload([]) == False
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
test_functions = [i for i in dir() if i.startswith('_test_')]
|
||||||
|
|
||||||
|
for i in test_functions:
|
||||||
|
print(i)
|
||||||
|
eval(i)()
|
||||||
14
hub/main.py
14
hub/main.py
@@ -70,18 +70,20 @@ def on_message(client, userdata, msg):
|
|||||||
processed_agent_data = ProcessedAgentData.model_validate_json(
|
processed_agent_data = ProcessedAgentData.model_validate_json(
|
||||||
payload, strict=True
|
payload, strict=True
|
||||||
)
|
)
|
||||||
|
|
||||||
redis_client.lpush(
|
redis_client.lpush(
|
||||||
"processed_agent_data", processed_agent_data.model_dump_json()
|
"processed_agent_data", processed_agent_data.model_dump_json()
|
||||||
)
|
)
|
||||||
processed_agent_data_batch: List[ProcessedAgentData] = []
|
|
||||||
if redis_client.llen("processed_agent_data") >= BATCH_SIZE:
|
if redis_client.llen("processed_agent_data") >= BATCH_SIZE:
|
||||||
|
processed_agent_data_batch: List[ProcessedAgentData] = []
|
||||||
for _ in range(BATCH_SIZE):
|
for _ in range(BATCH_SIZE):
|
||||||
processed_agent_data = ProcessedAgentData.model_validate_json(
|
raw_data = redis_client.lpop("processed_agent_data")
|
||||||
redis_client.lpop("processed_agent_data")
|
if raw_data:
|
||||||
)
|
data_item = ProcessedAgentData.model_validate_json(raw_data)
|
||||||
processed_agent_data_batch.append(processed_agent_data)
|
processed_agent_data_batch.append(data_item)
|
||||||
|
|
||||||
store_adapter.save_data(processed_agent_data_batch=processed_agent_data_batch)
|
store_adapter.save_data(processed_agent_data_batch=processed_agent_data_batch)
|
||||||
|
|
||||||
return {"status": "ok"}
|
return {"status": "ok"}
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.info(f"Error processing MQTT message: {e}")
|
logging.info(f"Error processing MQTT message: {e}")
|
||||||
|
|||||||
3
hub/test-entry.sh
Executable file
3
hub/test-entry.sh
Executable file
@@ -0,0 +1,3 @@
|
|||||||
|
#!/bin/sh
|
||||||
|
|
||||||
|
PYTHONPATH=$PWD python3 app/adapters/store_api_adapter_test.py
|
||||||
3
store/.gitignore
vendored
3
store/.gitignore
vendored
@@ -1,3 +0,0 @@
|
|||||||
venv
|
|
||||||
__pycache__
|
|
||||||
.idea
|
|
||||||
@@ -3,9 +3,10 @@ FROM python:latest
|
|||||||
# Set the working directory inside the container
|
# Set the working directory inside the container
|
||||||
WORKDIR /app
|
WORKDIR /app
|
||||||
# Copy the requirements.txt file and install dependencies
|
# Copy the requirements.txt file and install dependencies
|
||||||
COPY requirements.txt .
|
COPY store/requirements.txt .
|
||||||
|
|
||||||
RUN pip install --no-cache-dir -r requirements.txt
|
RUN pip install --no-cache-dir -r requirements.txt
|
||||||
# Copy the entire application into the container
|
# Copy the entire application into the container
|
||||||
COPY . .
|
COPY store/. .
|
||||||
# Run the main.py script inside the container when it starts
|
# Run the main.py script inside the container when it starts
|
||||||
CMD ["uvicorn", "main:app", "--host", "0.0.0.0"]
|
CMD ["uvicorn", "main:app", "--host", "0.0.0.0"]
|
||||||
13
store/Dockerfile-test
Normal file
13
store/Dockerfile-test
Normal file
@@ -0,0 +1,13 @@
|
|||||||
|
# Use the official Python image as the base image
|
||||||
|
FROM python:latest
|
||||||
|
# Set the working directory inside the container
|
||||||
|
WORKDIR /app
|
||||||
|
# Copy the requirements.txt file and install dependencies
|
||||||
|
COPY store/requirements.txt .
|
||||||
|
|
||||||
|
RUN pip install --no-cache-dir -r requirements.txt
|
||||||
|
# Copy the entire application into the container
|
||||||
|
COPY store/. .
|
||||||
|
# Run the main.py script inside the container when it starts
|
||||||
|
#CMD ["uvicorn", "main:app", "--host", "0.0.0.0"]
|
||||||
|
CMD ["./test-entry.sh"]
|
||||||
0
store/__init__.py
Normal file
0
store/__init__.py
Normal file
15
store/database.py
Normal file
15
store/database.py
Normal file
@@ -0,0 +1,15 @@
|
|||||||
|
from sqlalchemy import MetaData
|
||||||
|
from sqlalchemy import create_engine
|
||||||
|
from sqlalchemy.orm import sessionmaker, declarative_base
|
||||||
|
|
||||||
|
from config import POSTGRES_USER, POSTGRES_PASSWORD, POSTGRES_HOST, POSTGRES_PORT, POSTGRES_DB
|
||||||
|
|
||||||
|
|
||||||
|
DATABASE_URL = f"postgresql+psycopg2://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{POSTGRES_HOST}:{POSTGRES_PORT}/{POSTGRES_DB}"
|
||||||
|
engine = create_engine(DATABASE_URL)
|
||||||
|
|
||||||
|
Base = declarative_base()
|
||||||
|
|
||||||
|
metadata = MetaData()
|
||||||
|
|
||||||
|
SessionLocal = sessionmaker(bind=engine)
|
||||||
@@ -1,60 +0,0 @@
|
|||||||
version: "3.9"
|
|
||||||
name: "road_vision__database"
|
|
||||||
services:
|
|
||||||
postgres_db:
|
|
||||||
image: postgres:latest
|
|
||||||
container_name: postgres_db
|
|
||||||
restart: always
|
|
||||||
environment:
|
|
||||||
POSTGRES_USER: user
|
|
||||||
POSTGRES_PASSWORD: pass
|
|
||||||
POSTGRES_DB: test_db
|
|
||||||
volumes:
|
|
||||||
- postgres_data:/var/lib/postgresql/data
|
|
||||||
- ./db/structure.sql:/docker-entrypoint-initdb.d/structure.sql
|
|
||||||
ports:
|
|
||||||
- "5432:5432"
|
|
||||||
networks:
|
|
||||||
db_network:
|
|
||||||
|
|
||||||
|
|
||||||
pgadmin:
|
|
||||||
container_name: pgadmin4
|
|
||||||
image: dpage/pgadmin4
|
|
||||||
restart: always
|
|
||||||
environment:
|
|
||||||
PGADMIN_DEFAULT_EMAIL: admin@admin.com
|
|
||||||
PGADMIN_DEFAULT_PASSWORD: root
|
|
||||||
volumes:
|
|
||||||
- pgadmin-data:/var/lib/pgadmin
|
|
||||||
ports:
|
|
||||||
- "5050:80"
|
|
||||||
networks:
|
|
||||||
db_network:
|
|
||||||
|
|
||||||
|
|
||||||
store:
|
|
||||||
container_name: store
|
|
||||||
build: ..
|
|
||||||
depends_on:
|
|
||||||
- postgres_db
|
|
||||||
restart: always
|
|
||||||
environment:
|
|
||||||
POSTGRES_USER: user
|
|
||||||
POSTGRES_PASSWORD: pass
|
|
||||||
POSTGRES_DB: test_db
|
|
||||||
POSTGRES_HOST: postgres_db
|
|
||||||
POSTGRES_PORT: 5432
|
|
||||||
ports:
|
|
||||||
- "8000:8000"
|
|
||||||
networks:
|
|
||||||
db_network:
|
|
||||||
|
|
||||||
|
|
||||||
networks:
|
|
||||||
db_network:
|
|
||||||
|
|
||||||
|
|
||||||
volumes:
|
|
||||||
postgres_data:
|
|
||||||
pgadmin-data:
|
|
||||||
213
store/main.py
213
store/main.py
@@ -1,10 +1,8 @@
|
|||||||
import asyncio
|
|
||||||
import json
|
import json
|
||||||
from typing import Set, Dict, List, Any
|
from typing import Set, Dict, List
|
||||||
from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect, Body
|
from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect, Body
|
||||||
|
from fastapi.encoders import jsonable_encoder
|
||||||
from sqlalchemy import (
|
from sqlalchemy import (
|
||||||
create_engine,
|
|
||||||
MetaData,
|
|
||||||
Table,
|
Table,
|
||||||
Column,
|
Column,
|
||||||
Integer,
|
Integer,
|
||||||
@@ -12,25 +10,14 @@ from sqlalchemy import (
|
|||||||
Float,
|
Float,
|
||||||
DateTime,
|
DateTime,
|
||||||
)
|
)
|
||||||
from sqlalchemy.orm import sessionmaker
|
|
||||||
from sqlalchemy.sql import select
|
from sqlalchemy.sql import select
|
||||||
from datetime import datetime
|
|
||||||
from pydantic import BaseModel, field_validator
|
from database import metadata, SessionLocal
|
||||||
from config import (
|
from schemas import ProcessedAgentData, ProcessedAgentDataInDB
|
||||||
POSTGRES_HOST,
|
|
||||||
POSTGRES_PORT,
|
|
||||||
POSTGRES_DB,
|
|
||||||
POSTGRES_USER,
|
|
||||||
POSTGRES_PASSWORD,
|
|
||||||
)
|
|
||||||
|
|
||||||
# FastAPI app setup
|
# FastAPI app setup
|
||||||
app = FastAPI()
|
app = FastAPI()
|
||||||
# SQLAlchemy setup
|
|
||||||
DATABASE_URL = f"postgresql+psycopg2://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{POSTGRES_HOST}:{POSTGRES_PORT}/{POSTGRES_DB}"
|
|
||||||
engine = create_engine(DATABASE_URL)
|
|
||||||
metadata = MetaData()
|
|
||||||
# Define the ProcessedAgentData table
|
|
||||||
processed_agent_data = Table(
|
processed_agent_data = Table(
|
||||||
"processed_agent_data",
|
"processed_agent_data",
|
||||||
metadata,
|
metadata,
|
||||||
@@ -44,57 +31,6 @@ processed_agent_data = Table(
|
|||||||
Column("longitude", Float),
|
Column("longitude", Float),
|
||||||
Column("timestamp", DateTime),
|
Column("timestamp", DateTime),
|
||||||
)
|
)
|
||||||
SessionLocal = sessionmaker(bind=engine)
|
|
||||||
|
|
||||||
|
|
||||||
# SQLAlchemy model
|
|
||||||
class ProcessedAgentDataInDB(BaseModel):
|
|
||||||
id: int
|
|
||||||
road_state: str
|
|
||||||
user_id: int
|
|
||||||
x: float
|
|
||||||
y: float
|
|
||||||
z: float
|
|
||||||
latitude: float
|
|
||||||
longitude: float
|
|
||||||
timestamp: datetime
|
|
||||||
|
|
||||||
|
|
||||||
# FastAPI models
|
|
||||||
class AccelerometerData(BaseModel):
|
|
||||||
x: float
|
|
||||||
y: float
|
|
||||||
z: float
|
|
||||||
|
|
||||||
|
|
||||||
class GpsData(BaseModel):
|
|
||||||
latitude: float
|
|
||||||
longitude: float
|
|
||||||
|
|
||||||
|
|
||||||
class AgentData(BaseModel):
|
|
||||||
user_id: int
|
|
||||||
accelerometer: AccelerometerData
|
|
||||||
gps: GpsData
|
|
||||||
timestamp: datetime
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
@field_validator("timestamp", mode="before")
|
|
||||||
def check_timestamp(cls, value):
|
|
||||||
if isinstance(value, datetime):
|
|
||||||
return value
|
|
||||||
try:
|
|
||||||
return datetime.fromisoformat(value)
|
|
||||||
except (TypeError, ValueError):
|
|
||||||
raise ValueError(
|
|
||||||
"Invalid timestamp format. Expected ISO 8601 format (YYYY-MM-DDTHH:MM:SSZ)."
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class ProcessedAgentData(BaseModel):
|
|
||||||
road_state: str
|
|
||||||
agent_data: AgentData
|
|
||||||
|
|
||||||
|
|
||||||
# WebSocket subscriptions
|
# WebSocket subscriptions
|
||||||
subscriptions: Dict[int, Set[WebSocket]] = {}
|
subscriptions: Dict[int, Set[WebSocket]] = {}
|
||||||
@@ -104,10 +40,24 @@ subscriptions: Dict[int, Set[WebSocket]] = {}
|
|||||||
@app.websocket("/ws/{user_id}")
|
@app.websocket("/ws/{user_id}")
|
||||||
async def websocket_endpoint(websocket: WebSocket, user_id: int):
|
async def websocket_endpoint(websocket: WebSocket, user_id: int):
|
||||||
await websocket.accept()
|
await websocket.accept()
|
||||||
|
|
||||||
if user_id not in subscriptions:
|
if user_id not in subscriptions:
|
||||||
subscriptions[user_id] = set()
|
subscriptions[user_id] = set()
|
||||||
|
|
||||||
subscriptions[user_id].add(websocket)
|
subscriptions[user_id].add(websocket)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
# send already available data
|
||||||
|
r = processed_agent_data.select()
|
||||||
|
stored_data = SessionLocal().execute(r).fetchall()
|
||||||
|
|
||||||
|
jsonable_data = [{c.name: getattr(i, c.name) for c in processed_agent_data.columns} for i in stored_data]
|
||||||
|
for i in jsonable_data:
|
||||||
|
i['timestamp'] = i['timestamp'].strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||||
|
|
||||||
|
await websocket.send_json(json.dumps(jsonable_data))
|
||||||
|
|
||||||
|
# receive forever
|
||||||
while True:
|
while True:
|
||||||
await websocket.receive_text()
|
await websocket.receive_text()
|
||||||
except WebSocketDisconnect:
|
except WebSocketDisconnect:
|
||||||
@@ -123,12 +73,41 @@ async def send_data_to_subscribers(user_id: int, data):
|
|||||||
|
|
||||||
# FastAPI CRUDL endpoints
|
# FastAPI CRUDL endpoints
|
||||||
|
|
||||||
|
def ProcessedAgentData_to_td(data: List[ProcessedAgentData]):
|
||||||
|
return [
|
||||||
|
{
|
||||||
|
"road_state": item.road_state,
|
||||||
|
"user_id": item.agent_data.user_id,
|
||||||
|
"x": item.agent_data.accelerometer.x,
|
||||||
|
"y": item.agent_data.accelerometer.y,
|
||||||
|
"z": item.agent_data.accelerometer.z,
|
||||||
|
"latitude": item.agent_data.gps.latitude,
|
||||||
|
"longitude": item.agent_data.gps.longitude,
|
||||||
|
"timestamp": item.agent_data.timestamp,
|
||||||
|
}
|
||||||
|
for item in data
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
@app.post("/processed_agent_data/")
|
@app.post("/processed_agent_data/")
|
||||||
async def create_processed_agent_data(data: List[ProcessedAgentData]):
|
async def create_processed_agent_data(data: List[ProcessedAgentData], user_id: int = Body(..., embed=True)):
|
||||||
# Insert data to database
|
session = SessionLocal()
|
||||||
# Send data to subscribers
|
try:
|
||||||
pass
|
created_data = ProcessedAgentData_to_td(data)
|
||||||
|
stmt = processed_agent_data.insert().values(created_data).returning(processed_agent_data)
|
||||||
|
result = session.execute(stmt)
|
||||||
|
created_records = [dict(row._mapping) for row in result.fetchall()]
|
||||||
|
session.commit()
|
||||||
|
|
||||||
|
for record in created_records:
|
||||||
|
await send_data_to_subscribers(user_id, jsonable_encoder(record))
|
||||||
|
return created_records
|
||||||
|
except Exception as err:
|
||||||
|
session.rollback()
|
||||||
|
print(f"Database error: {err}")
|
||||||
|
raise HTTPException(status_code=500, detail="Internal Server Error")
|
||||||
|
finally:
|
||||||
|
session.close()
|
||||||
|
|
||||||
|
|
||||||
@app.get(
|
@app.get(
|
||||||
@@ -136,14 +115,34 @@ async def create_processed_agent_data(data: List[ProcessedAgentData]):
|
|||||||
response_model=ProcessedAgentDataInDB,
|
response_model=ProcessedAgentDataInDB,
|
||||||
)
|
)
|
||||||
def read_processed_agent_data(processed_agent_data_id: int):
|
def read_processed_agent_data(processed_agent_data_id: int):
|
||||||
# Get data by id
|
session = SessionLocal()
|
||||||
pass
|
try:
|
||||||
|
stmt = select(processed_agent_data).where(
|
||||||
|
processed_agent_data.c.id == processed_agent_data_id
|
||||||
|
)
|
||||||
|
res = session.execute(stmt).fetchone()
|
||||||
|
if not res:
|
||||||
|
raise HTTPException(status_code=404, detail="Not found")
|
||||||
|
|
||||||
|
return dict(res._mapping)
|
||||||
|
|
||||||
|
finally:
|
||||||
|
session.close()
|
||||||
|
|
||||||
|
|
||||||
@app.get("/processed_agent_data/", response_model=list[ProcessedAgentDataInDB])
|
@app.get("/processed_agent_data/", response_model=list[ProcessedAgentDataInDB])
|
||||||
def list_processed_agent_data():
|
def list_processed_agent_data():
|
||||||
# Get list of data
|
session = SessionLocal()
|
||||||
pass
|
try:
|
||||||
|
stmt = select(processed_agent_data)
|
||||||
|
res = session.execute(stmt).fetchall()
|
||||||
|
if not res:
|
||||||
|
raise HTTPException(status_code=404, detail="Not found")
|
||||||
|
|
||||||
|
return [dict(r._mapping) for r in res]
|
||||||
|
|
||||||
|
finally:
|
||||||
|
session.close()
|
||||||
|
|
||||||
|
|
||||||
@app.put(
|
@app.put(
|
||||||
@@ -152,7 +151,41 @@ def list_processed_agent_data():
|
|||||||
)
|
)
|
||||||
def update_processed_agent_data(processed_agent_data_id: int, data: ProcessedAgentData):
|
def update_processed_agent_data(processed_agent_data_id: int, data: ProcessedAgentData):
|
||||||
# Update data
|
# Update data
|
||||||
pass
|
session = SessionLocal()
|
||||||
|
|
||||||
|
try:
|
||||||
|
query = select(processed_agent_data).where(
|
||||||
|
processed_agent_data.c.id == processed_agent_data_id
|
||||||
|
)
|
||||||
|
result = session.execute(query).fetchone()
|
||||||
|
|
||||||
|
if not result:
|
||||||
|
raise HTTPException(status_code=404, detail="Data not found")
|
||||||
|
|
||||||
|
update_query = (
|
||||||
|
processed_agent_data.update()
|
||||||
|
.where(processed_agent_data.c.id == processed_agent_data_id)
|
||||||
|
.values(
|
||||||
|
road_state=data.road_state,
|
||||||
|
user_id=data.agent_data.user_id,
|
||||||
|
x=data.agent_data.accelerometer.x,
|
||||||
|
y=data.agent_data.accelerometer.y,
|
||||||
|
z=data.agent_data.accelerometer.z,
|
||||||
|
latitude=data.agent_data.gps.latitude,
|
||||||
|
longitude=data.agent_data.gps.longitude,
|
||||||
|
timestamp=data.agent_data.timestamp,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
session.execute(update_query)
|
||||||
|
session.commit()
|
||||||
|
|
||||||
|
updated_result = session.execute(query).fetchone()
|
||||||
|
|
||||||
|
return ProcessedAgentDataInDB(**updated_result._mapping)
|
||||||
|
|
||||||
|
finally:
|
||||||
|
session.close()
|
||||||
|
|
||||||
|
|
||||||
@app.delete(
|
@app.delete(
|
||||||
@@ -161,8 +194,28 @@ def update_processed_agent_data(processed_agent_data_id: int, data: ProcessedAge
|
|||||||
)
|
)
|
||||||
def delete_processed_agent_data(processed_agent_data_id: int):
|
def delete_processed_agent_data(processed_agent_data_id: int):
|
||||||
# Delete by id
|
# Delete by id
|
||||||
pass
|
session = SessionLocal()
|
||||||
|
|
||||||
|
try:
|
||||||
|
query = select(processed_agent_data).where(
|
||||||
|
processed_agent_data.c.id == processed_agent_data_id
|
||||||
|
)
|
||||||
|
result = session.execute(query).fetchone()
|
||||||
|
|
||||||
|
if not result:
|
||||||
|
raise HTTPException(status_code=404, detail="Data not found")
|
||||||
|
|
||||||
|
delete_query = processed_agent_data.delete().where(
|
||||||
|
processed_agent_data.c.id == processed_agent_data_id
|
||||||
|
)
|
||||||
|
|
||||||
|
session.execute(delete_query)
|
||||||
|
session.commit()
|
||||||
|
|
||||||
|
return ProcessedAgentDataInDB(**result._mapping)
|
||||||
|
|
||||||
|
finally:
|
||||||
|
session.close()
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
import uvicorn
|
import uvicorn
|
||||||
|
|||||||
Binary file not shown.
51
store/schemas.py
Normal file
51
store/schemas.py
Normal file
@@ -0,0 +1,51 @@
|
|||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
from pydantic import BaseModel, field_validator
|
||||||
|
|
||||||
|
|
||||||
|
class ProcessedAgentDataInDB(BaseModel):
|
||||||
|
id: int
|
||||||
|
road_state: str
|
||||||
|
user_id: int
|
||||||
|
x: float
|
||||||
|
y: float
|
||||||
|
z: float
|
||||||
|
latitude: float
|
||||||
|
longitude: float
|
||||||
|
timestamp: datetime
|
||||||
|
|
||||||
|
|
||||||
|
# FastAPI models
|
||||||
|
class AccelerometerData(BaseModel):
|
||||||
|
x: float
|
||||||
|
y: float
|
||||||
|
z: float
|
||||||
|
|
||||||
|
|
||||||
|
class GpsData(BaseModel):
|
||||||
|
latitude: float
|
||||||
|
longitude: float
|
||||||
|
|
||||||
|
|
||||||
|
class AgentData(BaseModel):
|
||||||
|
user_id: int
|
||||||
|
accelerometer: AccelerometerData
|
||||||
|
gps: GpsData
|
||||||
|
timestamp: datetime
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
@field_validator("timestamp", mode="before")
|
||||||
|
def check_timestamp(cls, value):
|
||||||
|
if isinstance(value, datetime):
|
||||||
|
return value
|
||||||
|
try:
|
||||||
|
return datetime.fromisoformat(value)
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
raise ValueError(
|
||||||
|
"Invalid timestamp format. Expected ISO 8601 format (YYYY-MM-DDTHH:MM:SSZ)."
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class ProcessedAgentData(BaseModel):
|
||||||
|
road_state: str
|
||||||
|
agent_data: AgentData
|
||||||
3
store/test-entry.sh
Executable file
3
store/test-entry.sh
Executable file
@@ -0,0 +1,3 @@
|
|||||||
|
#!/bin/sh
|
||||||
|
|
||||||
|
PYTHONPATH=$PWD python3 test/main_test.py
|
||||||
39
store/test/main_test.py
Normal file
39
store/test/main_test.py
Normal file
@@ -0,0 +1,39 @@
|
|||||||
|
from schemas import AccelerometerData, AgentData, GpsData, ProcessedAgentData
|
||||||
|
|
||||||
|
import main
|
||||||
|
|
||||||
|
def _test_ProcessedAgentData_to_td():
|
||||||
|
processed_data_batch = [
|
||||||
|
ProcessedAgentData(road_state = "normal",
|
||||||
|
agent_data = AgentData(user_id = 1,
|
||||||
|
accelerometer = AccelerometerData(x = 0.1, y = 0.2, z = 0.3),
|
||||||
|
gps = GpsData(latitude = 10.123, longitude = 20.456),
|
||||||
|
timestamp = "2023-07-21T12:34:56Z")
|
||||||
|
),
|
||||||
|
ProcessedAgentData(road_state = "normal",
|
||||||
|
agent_data = AgentData(user_id = 2,
|
||||||
|
accelerometer = AccelerometerData(x = 0.1, y = 0.2, z = 0.3),
|
||||||
|
gps = GpsData(latitude = 10.123, longitude = 20.456),
|
||||||
|
timestamp = "2023-07-21T12:34:56Z")
|
||||||
|
),
|
||||||
|
ProcessedAgentData(road_state = "normal",
|
||||||
|
agent_data = AgentData(user_id = 3,
|
||||||
|
accelerometer = AccelerometerData(x = 0.1, y = 0.2, z = 0.3),
|
||||||
|
gps = GpsData(latitude = 10.123, longitude = 20.456),
|
||||||
|
timestamp = "2023-07-21T12:34:56Z")
|
||||||
|
),
|
||||||
|
]
|
||||||
|
|
||||||
|
res = main.ProcessedAgentData_to_td(processed_data_batch)
|
||||||
|
|
||||||
|
assert res[0]["user_id"] == 1
|
||||||
|
assert res[1]["user_id"] == 2
|
||||||
|
assert res[2]["user_id"] == 3
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
test_functions = [i for i in dir() if i.startswith('_test_')]
|
||||||
|
|
||||||
|
for i in test_functions:
|
||||||
|
print(i)
|
||||||
|
eval(i)()
|
||||||
19
utils/check-up.py
Normal file
19
utils/check-up.py
Normal file
@@ -0,0 +1,19 @@
|
|||||||
|
import sys
|
||||||
|
|
||||||
|
print("Checking for dead containers...")
|
||||||
|
|
||||||
|
l = [i for i in sys.stdin.read().split("\n") if i]
|
||||||
|
header, statuses = l[0], l[1:]
|
||||||
|
|
||||||
|
status_index = header.find('STATUS')
|
||||||
|
name_index = header.find('NAMES')
|
||||||
|
|
||||||
|
exit_code = 0
|
||||||
|
|
||||||
|
for i in statuses:
|
||||||
|
if not i[status_index:].startswith("Up "):
|
||||||
|
service_name = i[name_index:]
|
||||||
|
print(f"Crash detected in {service_name}")
|
||||||
|
exit_code = 1
|
||||||
|
|
||||||
|
sys.exit(1)
|
||||||
Reference in New Issue
Block a user