Compare commits

...

84 Commits

Author SHA1 Message Date
hasslesstech 5ab16fec72 [P] Add TRACK_ID selection in MapView
Component testing / Hub testing (push) Successful in 25s
Component testing / Store testing (push) Successful in 31s
Component testing / Integration smoke testing (push) Successful in 2m26s
2026-03-25 22:58:23 +02:00
hasslesstech d633926a1a [P] Fix wrong row sending order
Component testing / Hub testing (push) Successful in 20s
Component testing / Store testing (push) Successful in 25s
Component testing / Integration smoke testing (push) Successful in 2m39s
2026-03-25 22:57:52 +02:00
hasslesstech 121bd007b3 [P] Add container logs printout on crash
Component testing / Hub testing (push) Successful in 24s
Component testing / Store testing (push) Successful in 27s
Component testing / Integration smoke testing (push) Has been cancelled
2026-03-25 22:56:12 +02:00
hasslesstech db63eb6d79 [P] Improve logging logic in agent
Component testing / Hub testing (push) Successful in 24s
Component testing / Store testing (push) Successful in 27s
Component testing / Integration smoke testing (push) Successful in 2m25s
2026-03-25 15:40:07 +02:00
hasslesstech 77d6968297 [P] Fix map lat, lon confusion
Component testing / Hub testing (push) Successful in 23s
Component testing / Store testing (push) Successful in 18s
Component testing / Integration smoke testing (push) Successful in 2m22s
2026-03-25 14:57:38 +02:00
hasslesstech 0c2392dc0b Merge remote-tracking branch 'github/dev' into dev
Component testing / Hub testing (push) Successful in 20s
Component testing / Store testing (push) Successful in 22s
Component testing / Integration smoke testing (push) Successful in 2m18s
2026-03-25 13:12:18 +02:00
VladiusVostokus 65f767d38e Merge pull request #29 from Rhinemann/lab4/yushchenko-SCRUM-81-DataProcessing-implementation
[L4] SCRUM-81: implementation data processing function
2026-03-25 11:11:31 +00:00
hasslesstech 0695e3d092 [P] Use state machine approach to determine road condition 2026-03-25 12:16:34 +02:00
AndriiJushchenko d6e094e6c0 fix: delay logic in DP 2026-03-25 12:16:34 +02:00
AndriiJushchenko 2167eb2960 DP implementation + delay 2026-03-25 12:16:34 +02:00
VladiusVostokus 38374a6723 Merge pull request #28 from Rhinemann/lab4/shmuliar-FIX-01-wrong-acceleration-data-types
[P] Fix acceleration data types
2026-03-25 10:14:10 +00:00
hasslesstech c08612f71a [P] Fix acceleration data types 2026-03-25 10:10:29 +02:00
hasslesstech bde51ca5e1 [P] Fix Store -> MapView websocket incompatibility
Component testing / Hub testing (push) Successful in 21s
Component testing / Store testing (push) Successful in 20s
Component testing / Integration smoke testing (push) Successful in 2m19s
2026-03-24 16:57:29 +02:00
hasslesstech a204bb1676 [P] Split rows into websocket-sendable messages 2026-03-24 16:38:16 +02:00
hasslesstech 764fb77f27 Merge from upstream 'github/dev' into dev
Component testing / Hub testing (push) Successful in 21s
Component testing / Store testing (push) Successful in 18s
Component testing / Integration smoke testing (push) Successful in 2m14s
2026-03-24 15:21:56 +02:00
VladiusVostokus a55fc17711 Merge pull request #27 from Rhinemann/lab4/hrynko-SCRUM-80-hubadapter
implement Edge-Hub integration with user_id validation (SCRUM-93, SCRUM-94)
2026-03-24 13:20:39 +00:00
hasslesstech b34e385128 [L4] Remove irrelevant environment variables 2026-03-24 14:42:13 +02:00
hasslesstech a8a0ef5e15 [L4] Remove unused import 2026-03-24 14:09:53 +02:00
hasslesstech 00b037a243 [L4] Remove uncessecary environment variable left over after print -> logging usage switch 2026-03-24 14:07:49 +02:00
hasslesstech d1b6c0eed1 [L4] Fix logging level on error message 2026-03-24 14:03:29 +02:00
hasslesstech 5e890d4f03 [L4] Remove obvious single code line comments to reduce risk of misleading comments in the future 2026-03-24 14:02:03 +02:00
hasslesstech a8e50d0386 [L4] Remove excessive library import and clean up edge codebase 2026-03-24 13:58:32 +02:00
hasslesstech 1b42be264d [L4] Remove misleading batch_size parameter from AgentMQTTAdapter 2026-03-24 13:37:33 +02:00
esk4nz b12bdc334c fix: improve logging clarity and ensure data delivery in AgentMQTTAdapter 2026-03-23 23:41:25 +02:00
esk4nz e8ff1c6cbd Replaced busy-wait loop with threading.Event to fix 100% CPU load 2026-03-23 23:28:17 +02:00
esk4nz ad70519f47 SCRUM-80
Removed manual user_id fallback in data processing
2026-03-23 23:00:19 +02:00
esk4nz b10aec1020 SCRUM-80
Changed print to logging in agent main
2026-03-23 22:57:15 +02:00
esk4nz c085a49c8c implement Edge-Hub integration with user_id validation (SCRUM-93, SCRUM-94)
- Agent: Updated config and main
- Edge: Implemented adapter factory in main.py to switch between MQTT and HTTP.
- Edge: Updated AgentData entity and processing logic to support user_id.
- Infrastructure: Configured docker-compose for dynamic protocol switching and environment management.
2026-03-23 21:31:31 +02:00
hasslesstech 0b8d2eb18b [P] Add container rebuilding on every rerun
Component testing / Hub testing (push) Successful in 18s
Component testing / Store testing (push) Successful in 22s
Component testing / Integration smoke testing (push) Successful in 2m33s
2026-03-23 18:31:43 +02:00
hasslesstech 2846130e4e [P] Add docker reset workflow
Component testing / Hub testing (push) Successful in 20s
Component testing / Store testing (push) Successful in 21s
Component testing / Integration smoke testing (push) Successful in 1m34s
2026-03-23 18:26:38 +02:00
hasslesstech 30af132033 [P] Add general smoke test and Store incremental test
Component testing / Hub testing (push) Successful in 22s
Component testing / Store testing (push) Successful in 19s
Component testing / Integration smoke testing (push) Successful in 1m28s
2026-03-23 18:01:41 +02:00
hasslesstech 60a846d8b8 [P] Refactor testing code 2026-03-23 16:10:09 +02:00
hasslesstech fe6bb6ab3a [P] Add CI for updated Hub component part
Hub component testing / hub-test (push) Successful in 25s
2026-03-23 15:53:44 +02:00
ІО-23 Shmuliar Oleh 30f81ec1ae Merge pull request #26 from Rhinemann/lab4/shved-SCRUM-95-test-repo-functionality
set up global docker-compose
2026-03-22 21:59:07 +02:00
hasslesstech 1b6f47fa0d [L4] Fix relative paths after file move 2026-03-22 21:13:44 +02:00
Rhinemann b1e6ad7c94 set up global docker-compose 2026-03-22 14:07:29 +01:00
VladiusVostokus 1eddfd966b Merge pull request #24 from Rhinemann/lab5/shmuliar-SCRUM-92-mapview-store-integration
SCRUM 92: mapview store integration
2026-03-14 16:07:30 +00:00
hasslesstech 8af68d6dd9 hotfix: index overflow on user_id 2026-03-13 20:47:09 +02:00
hasslesstech 63aca15824 add multiuser rendering support 2026-03-13 20:41:16 +02:00
hasslesstech ee509f72a4 pull data in MapView/main.py from actual data source 2026-03-13 19:02:07 +02:00
hasslesstech da9fe69d4e add initial server->client update with all current DB data 2026-03-13 19:01:33 +02:00
hasslesstech 1c856dca0e fix MapView/main.py crash due to wrong check condition 2026-03-13 18:58:28 +02:00
VladiusVostokus 17738d07fe Merge pull request #21 from Rhinemann/lab5/gryshaiev-SCRUM-90-set-bump-marker
SCRUM-90: implement set_bump_marker
2026-03-11 17:02:07 +00:00
VladiusVostokus 6b5831ff1b Merge branch 'dev' into lab5/gryshaiev-SCRUM-90-set-bump-marker 2026-03-11 17:01:54 +00:00
VladiusVostokus 54505db70e Merge pull request #23 from Rhinemann/lab5/gryshaiev-SCRUM-89-set-pothole-marker
SCRUM-89: implement set_pothole_marker()
2026-03-11 16:59:35 +00:00
SimonSanich 6f4b3b0ea6 SCRUM-90: implement set_bump_marker 2026-03-11 18:36:40 +02:00
SimonSanich 948a936a1f lab 5: implement set_bump_marker() 2026-03-11 18:10:18 +02:00
esk4nz 87facff668 Merge pull request #16 from Rhinemann/lab3/hrynko-SCRUM-77-post_to_storeAPI_from_hub
SCRUM-77 Post method from Hub to Store
2026-03-09 23:04:16 +02:00
VladiusVostokus 294ed5958e Merge pull request #13 from Rhinemann/lab5/kovalenko-SCRUM-98-FileReader
SCRUM-98 file reader
2026-03-08 15:48:13 +00:00
VladiusVostokus cbdf81c028 Merge pull request #19 from Rhinemann/lab5/slobodeniuk-SCRUM-84-MapViewApp
Lab5/slobodeniuk scrum 84 map view app
2026-03-08 15:43:27 +00:00
Senya a98223cbb0 feature: SCRUM-84 add on_start, update, build methods in MapViewMap 2026-03-08 13:32:23 +02:00
Slobodeniuk Sasha 0bd3eaa91d Merge pull request #18 from Rhinemann/dev
Scrum 87 - MapViewApp (check_road_quality, update_car_marker)
2026-03-08 13:08:59 +02:00
VladiusVostokus 9bdb98c19b Merge pull request #17 from Rhinemann/lab5/yushchenko-SCRUM-87-check-road-quality-method
SCRUM-87, 88: add methods update_car_marker and check_road_quality
2026-03-08 11:00:53 +00:00
AndriiJushchenko c3b71daaeb add check_road_quality method 2026-03-07 17:12:23 +02:00
AndriiJushchenko 31c760f4a8 add method update_car_marker(self, point) 2026-03-07 16:55:20 +02:00
esk4nz 24aeb1a19f changed Batch size to 20
reworked method "on_message"
implemented storeApiAdapter
2026-03-06 00:08:08 +02:00
ІМ-24 Владислав Коваленко 4a81434cb6 feat: handle empty rows 2026-03-03 17:05:53 +00:00
ІО-23 Shmuliar Oleh ceffcfeac2 Merge pull request #14 from Rhinemann/dev
Commit repository configuration to main branch
2026-03-03 18:44:31 +02:00
VladiusVostokus 312177e087 Merge pull request #12 from Rhinemann/lab3/hrynko-SCRUM-56-repo_set_up
SCRUM-56 Repo setup
2026-03-03 15:24:51 +00:00
ІМ-24 Владислав Коваленко a52da042ef refactor: ignore spaces in row 2026-03-03 15:17:25 +00:00
ІМ-24 Владислав Коваленко 11c590cf25 feat: start read file again if got to the end of file 2026-03-03 15:11:38 +00:00
ІМ-24 Владислав Коваленко 550d29c48c refactor: move accelerator dataclas to domain folder 2026-03-03 14:59:46 +00:00
ІМ-24 Владислав Коваленко 8a1327b10a fix: remove empty rows 2026-03-03 14:55:44 +00:00
ІМ-24 Владислав Коваленко db1b7cc6fc feat: read row from file 2026-03-03 14:36:56 +00:00
ІМ-24 Владислав Коваленко a899ef6a6e feat: get indexes of file header fields 2026-03-03 14:10:18 +00:00
ІМ-24 Владислав Коваленко 95176ea467 feat: close file 2026-03-03 14:04:27 +00:00
ІМ-24 Владислав Коваленко 081a2d4240 fix: file field name 2026-03-03 14:03:53 +00:00
ІМ-24 Владислав Коваленко 92c20ef612 feat: open file 2026-03-03 13:59:40 +00:00
ІМ-24 Владислав Коваленко c31363aa57 feat: add empty methods to class 2026-03-03 13:46:26 +00:00
esk4nz f96930a259 Setting up the repo with correct dockerfiles and docker-copmose files, and deleting .idea from git 2026-03-02 23:21:36 +02:00
Andriy Yushchenko 87df394352 Merge pull request #11 from Rhinemann/lab1-slobodeniuk-feature-SCRUM-34
Lab1 slobodeniuk feature scrum 34
2026-03-01 19:57:40 +02:00
VladiusVostokus 10ad9774a7 Merge pull request #10 from Rhinemann/lab1
Lab1
2026-02-28 18:46:27 +00:00
VladiusVostokus b730dbb74c Merge pull request #8 from Rhinemann/lab2
Lab2
2026-02-28 18:45:54 +00:00
VladiusVostokus e4e585b9ac Merge pull request #9 from Rhinemann/feature/lab2-update-delete
Feature/lab2 update delete
2026-02-27 13:21:26 +00:00
anastasia-sl 185b0aae58 implemented update and delete endpoints 2026-02-26 14:55:51 +02:00
Slobodeniuk Sasha 3931fa58c1 Merge pull request #7 from Rhinemann/feature/slobodeniuk-lab2-SCRUM-48-GET
SCRUM-48: feature with CRUD (GET)
2026-02-26 11:22:14 +02:00
Senya 98fb6aa12a SCRUM-48: feature with CRUD (GET) 2026-02-26 11:20:47 +02:00
VladiusVostokus ea9be3fb57 Merge pull request #6 from Rhinemann/lab2_yushchenko
CRUD операції (POST, websocket)
2026-02-25 17:10:36 +00:00
AndriiJushchenko f3512e4afb Трохи пофіксив функцію post і провів тести post і websoket. 2026-02-25 19:05:25 +02:00
AndriiJushchenko 69e679eccf SCRUM-[49, 54] Реалізувати POST та відправку по websoket під час виконання POST 2026-02-24 22:18:36 +02:00
VladiusVostokus 184098b826 Merge pull request #2 from Rhinemann/lab2_shved
Lab2 shved
2026-02-23 17:28:11 +00:00
VladiusVostokus b2c7427af0 Merge pull request #1 from Rhinemann/lab1_shved
updated compose file
2026-02-23 16:10:12 +00:00
Rhinemann 1e7516fe7b update requirements 2026-02-22 12:19:33 +01:00
Rhinemann a63864bcaa updated compose file 2026-02-22 12:17:03 +01:00
48 changed files with 2261 additions and 372 deletions
+23
View File
@@ -0,0 +1,23 @@
# IDEs
.idea/
.vscode/
.git/
.gitignore
.dockerignore
.DS_Store
Thumbs.db
# Python
**/__pycache__/
**/*.py[cod]
**/*.pyo
**/*.pyd
venv/
.env
# Logs & Database & Broker data
*.log
**/mosquitto/data/
**/mosquitto/log/
**/postgres_data/
**/pgadmin-data/
+16
View File
@@ -0,0 +1,16 @@
name: Reset docker state
on: workflow_dispatch
jobs:
reset:
runs-on: host-arch-x86_64
name: Reset docker state
steps:
- name: Stop all containers
run: docker stop $(docker ps -a | cut -d " " -f 1 | tail -n +2)
- name: Remove all containers
run: docker rm $(docker ps -a | cut -d " " -f 1 | tail -n +2)
- name: Remove extra volumes
run: docker volume rm road_vision_postgres_data road_vision_pgadmin-data
+71
View File
@@ -0,0 +1,71 @@
name: Component testing
on: [push, workflow_dispatch]
jobs:
hub-test:
name: Hub testing
runs-on: host-arch-x86_64
steps:
- name: Clone repository
run: git clone --revision ${{ gitea.sha }} --depth 1 ${{ gitea.server_url }}/${{ gitea.repository }}
- name: Build Hub testing container
working-directory: IoT-Systems
run: docker build -t local/hub/${{gitea.sha}} -f hub/Dockerfile-test .
- name: Run Hub tests
working-directory: IoT-Systems
run: docker run --rm -it local/hub/${{gitea.sha}}
- name: Clean up containers
if: ${{always()}}
run: docker image rm local/hub/${{gitea.sha}}
store-test:
name: Store testing
runs-on: host-arch-x86_64
steps:
- name: Clone repository
run: git clone --revision ${{ gitea.sha }} --depth 1 ${{ gitea.server_url }}/${{ gitea.repository }}
- name: Build Store testing container
working-directory: IoT-Systems
run: docker build -t local/store/${{gitea.sha}} -f store/Dockerfile-test .
- name: Run Store tests
working-directory: IoT-Systems
run: docker run --rm -it local/store/${{gitea.sha}}
- name: Clean up containers
if: ${{always()}}
run: docker image rm local/store/${{gitea.sha}}
integration-smoke-test:
name: Integration smoke testing
runs-on: host-arch-x86_64
needs:
- hub-test
- store-test
steps:
- name: Clone repository
run: git clone --revision ${{ gitea.sha }} --depth 1 ${{ gitea.server_url }}/${{ gitea.repository }}
- name: Build all production containers
working-directory: IoT-Systems
run: docker-compose build
- name: Start all production containers
working-directory: IoT-Systems
run: docker-compose up -d
- name: Wait for crashes to happen
run: sleep 30
- name: Check for dead containers
working-directory: IoT-Systems
run: docker ps -a | python3 utils/check-up.py
- name: Clean up
if: ${{always()}}
working-directory: IoT-Systems
run: docker-compose down -v
+24 -3
View File
@@ -1,4 +1,25 @@
agent/docker/mosquitto/data/ # IDEs
agent/docker/mosquitto/log/
.idea/ .idea/
.vscode/
*.swp
*.swo
# Python
venv/
__pycache__/
*.py[cod]
*$py.class
.env
# Logs
*.log
app.log
# Database & Broker data
**/mosquitto/data/
**/mosquitto/log/
**/postgres_data/
**/pgadmin-data/
# OS specific
.DS_Store
-3
View File
@@ -1,3 +0,0 @@
.idea
venv
__pycache__
+2
View File
@@ -2,3 +2,5 @@ import os
STORE_HOST = os.environ.get("STORE_HOST") or "localhost" STORE_HOST = os.environ.get("STORE_HOST") or "localhost"
STORE_PORT = os.environ.get("STORE_PORT") or 8000 STORE_PORT = os.environ.get("STORE_PORT") or 8000
TRACK_ID = int(os.environ.get("TID") or '1')
+1422 -1
View File
File diff suppressed because one or more lines are too long
+2 -1
View File
@@ -72,9 +72,10 @@ class Datasource:
) )
new_points = [ new_points = [
( (
processed_agent_data.latitude,
processed_agent_data.longitude, processed_agent_data.longitude,
processed_agent_data.latitude,
processed_agent_data.road_state, processed_agent_data.road_state,
processed_agent_data.user_id
) )
for processed_agent_data in processed_agent_data_list for processed_agent_data in processed_agent_data_list
] ]
+7
View File
@@ -0,0 +1,7 @@
from dataclasses import dataclass
@dataclass
class Accelerometer:
x: float
y: float
z: float
+50
View File
@@ -0,0 +1,50 @@
from csv import reader
import config
from domain.accelerometer import Accelerometer
class FileReader:
def __init__(
self, data_filename: str,
) -> None:
self.file_path = data_filename
pass
def read(self):
return self.getNextValue()
def startReading(self, *args, **kwargs):
self.file = open(self.file_path, newline='')
self.file_reader = reader(self.file, skipinitialspace=True)
file_header = next(self.file_reader)
self.x_idx = file_header.index('X')
self.y_idx = file_header.index('Y')
self.z_idx = file_header.index('Z')
def getNextValue(self):
while True:
row = next(self.file_reader, None)
if row is None:
self._rewind_file()
continue
try:
x = int(row[self.x_idx])
y = int(row[self.y_idx])
z = int(row[self.z_idx])
return Accelerometer(x=x, y=y, z=z)
except Exception as e:
continue
def _rewind_file(self):
self.file.seek(0)
self.file_reader = reader(self.file)
next(self.file_reader)
def stopReading(self, *args, **kwargs):
if self.file:
self.file.close()
self.file_reader = None
+106 -11
View File
@@ -4,48 +4,143 @@ from kivy_garden.mapview import MapMarker, MapView
from kivy.clock import Clock from kivy.clock import Clock
from lineMapLayer import LineMapLayer from lineMapLayer import LineMapLayer
from datasource import Datasource from datasource import Datasource
import config
line_layer_colors = [
[1, 0, 0, 1],
[1, 0.5, 0, 1],
[0, 1, 0, 1],
[0, 1, 1, 1],
[0, 0, 1, 1],
[1, 0, 1, 1],
]
class MapViewApp(App): class MapViewApp(App):
def __init__(self, **kwargs): def __init__(self, **kwargs):
super().__init__() super().__init__(**kwargs)
self.mapview = None
self.datasource = Datasource(user_id=1)
self.line_layers = dict()
self.car_markers = dict()
# додати необхідні змінні # додати необхідні змінні
self.bump_markers = []
self.pothole_markers = []
def on_start(self): def on_start(self):
""" """
Встановлює необхідні маркери, викликає функцію для оновлення мапи Встановлює необхідні маркери, викликає функцію для оновлення мапи
""" """
self.update()
Clock.schedule_interval(self.update, 0.1)
def update(self, *args): def update(self, *args):
""" """
Викликається регулярно для оновлення мапи Викликається регулярно для оновлення мапи
""" """
new_points = self.datasource.get_new_points()
if not new_points:
return
def update_car_marker(self, point): for point in new_points:
lat, lon, road_state, user_id = point
# Оновлює лінію маршрута
if user_id not in self.line_layers:
self.line_layers[user_id] = LineMapLayer(color = line_layer_colors[user_id % len(line_layer_colors)])
self.mapview.add_layer(self.line_layers[user_id])
self.line_layers[user_id].add_point((lat, lon))
# Оновлює маркер маниши
self.update_car_marker(lat, lon, user_id)
# Перевіряємо стан дороги
self.check_road_quality(point)
def check_road_quality(self, point):
"""
Аналізує дані акселерометра для подальшого визначення
та відображення ям та лежачих поліцейських
"""
if len(point) < 3:
return
lat, lon, road_state, user_id = point
if road_state == "pothole":
self.set_pothole_marker((lat, lon))
elif road_state == "bump":
self.set_bump_marker((lat, lon))
def update_car_marker(self, lat, lon, user_id):
""" """
Оновлює відображення маркера машини на мапі Оновлює відображення маркера машини на мапі
:param point: GPS координати :param point: GPS координати
""" """
if user_id not in self.car_markers:
self.car_markers[user_id] = MapMarker(lat=lat, lon=lon, source='./images/car.png')
self.mapview.add_marker(self.car_markers[user_id])
else:
self.car_markers[user_id].lat = lat
self.car_markers[user_id].lon = lon
if user_id == config.TRACK_ID:
self.mapview.center_on(lat, lon)
def set_pothole_marker(self, point): def set_pothole_marker(self, point):
""" if isinstance(point, dict):
Встановлює маркер для ями lat = point.get("lat")
:param point: GPS координати lon = point.get("lon")
""" else:
lat, lon = point
if lat is None or lon is None:
return
marker = MapMarker(
lat=lat,
lon=lon,
source="images/pothole.png"
)
self.mapview.add_marker(marker)
self.pothole_markers.append(marker)
def set_bump_marker(self, point): def set_bump_marker(self, point):
""" if isinstance(point, dict):
Встановлює маркер для лежачого поліцейського lat = point.get("lat")
:param point: GPS координати lon = point.get("lon")
""" else:
lat, lon = point
if lat is None or lon is None:
return
marker = MapMarker(
lat=lat,
lon=lon,
source="images/bump.png"
)
self.mapview.add_marker(marker)
self.bump_markers.append(marker)
def build(self): def build(self):
""" """
Ініціалізує мапу MapView(zoom, lat, lon) Ініціалізує мапу MapView(zoom, lat, lon)
:return: мапу :return: мапу
""" """
self.mapview = MapView() self.mapview = MapView(
zoom=15,
lat=50.4501,
lon=30.5234
)
return self.mapview return self.mapview
-2
View File
@@ -1,2 +0,0 @@
venv
__pycache__
+3 -3
View File
@@ -3,10 +3,10 @@ FROM python:latest
# set the working directory in the container # set the working directory in the container
WORKDIR /usr/agent WORKDIR /usr/agent
# copy the dependencies file to the working directory # copy the dependencies file to the working directory
COPY requirements.txt . COPY agent/requirements.txt .
# install dependencies # install dependencies
RUN pip install -r requirements.txt RUN pip install --no-cache-dir -r requirements.txt
# copy the content of the local src directory to the working directory # copy the content of the local src directory to the working directory
COPY src/ . COPY agent/src/ .
# command to run on container start # command to run on container start
CMD ["python", "main.py"] CMD ["python", "main.py"]
-32
View File
@@ -1,32 +0,0 @@
name: "road_vision"
services:
mqtt:
image: eclipse-mosquitto
container_name: mqtt
volumes:
- ./mosquitto:/mosquitto
- ./mosquitto/data:/mosquitto/data
- ./mosquitto/log:/mosquitto/log
ports:
- 1883:1883
- 9001:9001
networks:
mqtt_network:
fake_agent:
container_name: agent
build: ../
depends_on:
- mqtt
environment:
MQTT_BROKER_HOST: "mqtt"
MQTT_BROKER_PORT: 1883
MQTT_TOPIC: "agent_data_topic"
DELAY: 0.1
networks:
mqtt_network:
networks:
mqtt_network:
+1 -1
View File
@@ -8,7 +8,7 @@ def try_parse(type, value: str):
return None return None
USER_ID = 1 USER_ID = try_parse(int, os.environ.get("USER_ID")) or 1
# MQTT config # MQTT config
MQTT_BROKER_HOST = os.environ.get("MQTT_BROKER_HOST") or "mqtt" MQTT_BROKER_HOST = os.environ.get("MQTT_BROKER_HOST") or "mqtt"
MQTT_BROKER_PORT = try_parse(int, os.environ.get("MQTT_BROKER_PORT")) or 1883 MQTT_BROKER_PORT = try_parse(int, os.environ.get("MQTT_BROKER_PORT")) or 1883
+3 -3
View File
@@ -3,6 +3,6 @@ from dataclasses import dataclass
@dataclass @dataclass
class Accelerometer: class Accelerometer:
x: int x: float
y: int y: float
z: int z: float
+7 -5
View File
@@ -15,6 +15,7 @@ class FileDatasource:
def __init__( def __init__(
self, self,
acc_divisor: float,
accelerometer_filename: str, accelerometer_filename: str,
gps_filename: str, gps_filename: str,
park_filename: str, park_filename: str,
@@ -34,6 +35,8 @@ class FileDatasource:
self._started = False self._started = False
self.acc_divisor = acc_divisor
def startReading(self, *args, **kwargs): def startReading(self, *args, **kwargs):
"""Must be called before read()""" """Must be called before read()"""
if self._started: if self._started:
@@ -160,15 +163,14 @@ class FileDatasource:
return row return row
@staticmethod def _parse_acc(self, row: List[str]) -> Accelerometer:
def _parse_acc(row: List[str]) -> Accelerometer:
if len(row) < 3: if len(row) < 3:
raise ValueError(f"Accelerometer row must have 3 values (x,y,z). Got: {row}") raise ValueError(f"Accelerometer row must have 3 values (x,y,z). Got: {row}")
try: try:
x = int(row[0]) x = int(row[0]) / self.acc_divisor
y = int(row[1]) y = int(row[1]) / self.acc_divisor
z = int(row[2]) z = int(row[2]) / self.acc_divisor
except ValueError as e: except ValueError as e:
raise ValueError(f"Invalid accelerometer values (expected integers): {row}") from e raise ValueError(f"Invalid accelerometer values (expected integers): {row}") from e
+7 -2
View File
@@ -1,6 +1,7 @@
from paho.mqtt import client as mqtt_client from paho.mqtt import client as mqtt_client
from schema.aggregated_data_schema import AggregatedDataSchema from schema.aggregated_data_schema import AggregatedDataSchema
from file_datasource import FileDatasource from file_datasource import FileDatasource
import logging
import config import config
@@ -15,6 +16,8 @@ def connect_mqtt(broker, port):
print("Failed to connect {broker}:{port}, return code %d\n", rc) print("Failed to connect {broker}:{port}, return code %d\n", rc)
exit(rc) # Stop execution exit(rc) # Stop execution
logging.info(f"Acting as USER_ID = {config.USER_ID}")
client = mqtt_client.Client() client = mqtt_client.Client()
client.on_connect = on_connect client.on_connect = on_connect
client.connect(broker, port) client.connect(broker, port)
@@ -28,16 +31,18 @@ def publish(client, topic, datasource):
data = datasource.read() data = datasource.read()
msg = AggregatedDataSchema().dumps(data) msg = AggregatedDataSchema().dumps(data)
result = client.publish(topic, msg) result = client.publish(topic, msg)
logging.debug(f"Published to {topic}: {msg[:50]}...")
status = result[0] status = result[0]
if status != 0: if status != 0:
print(f"Failed to send message to topic {topic}") logging.error(f"Failed to send message to topic {topic}")
def run(): def run():
logging.basicConfig(level = logging.INFO)
# Prepare mqtt client # Prepare mqtt client
client = connect_mqtt(config.MQTT_BROKER_HOST, config.MQTT_BROKER_PORT) client = connect_mqtt(config.MQTT_BROKER_HOST, config.MQTT_BROKER_PORT)
# Prepare datasource # Prepare datasource
datasource = FileDatasource("data/accelerometer.csv", "data/gps.csv", "data/parking.csv") datasource = FileDatasource(16384.0, "data/accelerometer.csv", "data/gps.csv", "data/parking.csv")
# Infinity publish data # Infinity publish data
publish(client, config.MQTT_TOPIC, datasource) publish(client, config.MQTT_TOPIC, datasource)
+3 -3
View File
@@ -2,6 +2,6 @@ from marshmallow import Schema, fields
class AccelerometerSchema(Schema): class AccelerometerSchema(Schema):
x = fields.Int() x = fields.Float()
y = fields.Int() y = fields.Float()
z = fields.Int() z = fields.Float()
@@ -1,13 +1,12 @@
version: "3.9" name: "road_vision"
name: "road_vision__hub"
services: services:
mqtt: mqtt:
image: eclipse-mosquitto image: eclipse-mosquitto
container_name: mqtt container_name: mqtt
volumes: volumes:
- ./mosquitto:/mosquitto - ./agent/docker/mosquitto:/mosquitto
- ./mosquitto/data:/mosquitto/data - ./agent/docker/mosquitto/data:/mosquitto/data
- ./mosquitto/log:/mosquitto/log - ./agent/docker/mosquitto/log:/mosquitto/log
ports: ports:
- 1883:1883 - 1883:1883
- 9001:9001 - 9001:9001
@@ -15,8 +14,45 @@ services:
mqtt_network: mqtt_network:
fake_agent:
container_name: agent
build:
context: .
dockerfile: agent/Dockerfile
depends_on:
- mqtt
environment:
PYTHONUNBUFFERED: 1
MQTT_BROKER_HOST: "mqtt"
MQTT_BROKER_PORT: 1883
MQTT_TOPIC: "agent_data_topic"
DELAY: 0.1
networks:
mqtt_network:
edge:
container_name: edge
build:
context: .
dockerfile: edge/Dockerfile
depends_on:
- mqtt
environment:
MQTT_BROKER_HOST: "mqtt"
MQTT_BROKER_PORT: 1883
MQTT_TOPIC: "agent_data_topic"
HUB_HOST: "hub"
HUB_PORT: 8000
HUB_CONNECTION_TYPE: "http"
HUB_MQTT_BROKER_HOST: "mqtt"
HUB_MQTT_BROKER_PORT: 1883
HUB_MQTT_TOPIC: "processed_data_topic"
networks:
mqtt_network:
edge_hub:
postgres_db: postgres_db:
image: postgres:latest image: postgres:17
container_name: postgres_db container_name: postgres_db
restart: always restart: always
environment: environment:
@@ -25,13 +61,12 @@ services:
POSTGRES_DB: test_db POSTGRES_DB: test_db
volumes: volumes:
- postgres_data:/var/lib/postgresql/data - postgres_data:/var/lib/postgresql/data
- ./db/structure.sql:/docker-entrypoint-initdb.d/structure.sql - ./store/docker/db/structure.sql:/docker-entrypoint-initdb.d/structure.sql
ports: ports:
- "5432:5432" - "5432:5432"
networks: networks:
db_network: db_network:
pgadmin: pgadmin:
container_name: pgadmin4 container_name: pgadmin4
image: dpage/pgadmin4 image: dpage/pgadmin4
@@ -49,11 +84,14 @@ services:
store: store:
container_name: store container_name: store
build: ../../store build:
context: .
dockerfile: store/Dockerfile
depends_on: depends_on:
- postgres_db - postgres_db
restart: always restart: always
environment: environment:
PYTHONUNBUFFERED: 1
POSTGRES_USER: user POSTGRES_USER: user
POSTGRES_PASSWORD: pass POSTGRES_PASSWORD: pass
POSTGRES_DB: test_db POSTGRES_DB: test_db
@@ -77,12 +115,15 @@ services:
hub: hub:
container_name: hub container_name: hub
build: ../ build:
context: .
dockerfile: hub/Dockerfile
depends_on: depends_on:
- mqtt - mqtt
- redis - redis
- store - store
environment: environment:
PYTHONUNBUFFERED: 1
STORE_API_HOST: "store" STORE_API_HOST: "store"
STORE_API_PORT: 8000 STORE_API_PORT: 8000
REDIS_HOST: "redis" REDIS_HOST: "redis"
@@ -90,7 +131,7 @@ services:
MQTT_BROKER_HOST: "mqtt" MQTT_BROKER_HOST: "mqtt"
MQTT_BROKER_PORT: 1883 MQTT_BROKER_PORT: 1883
MQTT_TOPIC: "processed_data_topic" MQTT_TOPIC: "processed_data_topic"
BATCH_SIZE: 1 BATCH_SIZE: 20
ports: ports:
- "9000:8000" - "9000:8000"
networks: networks:
@@ -98,10 +139,11 @@ services:
hub_store: hub_store:
hub_redis: hub_redis:
networks: networks:
mqtt_network: mqtt_network:
db_network: db_network:
edge_hub:
hub:
hub_store: hub_store:
hub_redis: hub_redis:
-2
View File
@@ -1,2 +0,0 @@
venv
app.log
+2 -2
View File
@@ -3,9 +3,9 @@ FROM python:3.9-slim
# Set the working directory inside the container # Set the working directory inside the container
WORKDIR /app WORKDIR /app
# Copy the requirements.txt file and install dependencies # Copy the requirements.txt file and install dependencies
COPY requirements.txt . COPY edge/requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt RUN pip install --no-cache-dir -r requirements.txt
# Copy the entire application into the container # Copy the entire application into the container
COPY . . COPY edge/. .
# Run the main.py script inside the container when it starts # Run the main.py script inside the container when it starts
CMD ["python", "main.py"] CMD ["python", "main.py"]
+9 -32
View File
@@ -13,9 +13,7 @@ class AgentMQTTAdapter(AgentGateway):
broker_port, broker_port,
topic, topic,
hub_gateway: HubGateway, hub_gateway: HubGateway,
batch_size=10,
): ):
self.batch_size = batch_size
# MQTT # MQTT
self.broker_host = broker_host self.broker_host = broker_host
self.broker_port = broker_port self.broker_port = broker_port
@@ -35,42 +33,21 @@ class AgentMQTTAdapter(AgentGateway):
"""Processing agent data and sent it to hub gateway""" """Processing agent data and sent it to hub gateway"""
try: try:
payload: str = msg.payload.decode("utf-8") payload: str = msg.payload.decode("utf-8")
# Create AgentData instance with the received data
agent_data = AgentData.model_validate_json(payload, strict=True) agent_data = AgentData.model_validate_json(payload, strict=True)
# Process the received data (you can call a use case here if needed)
processed_data = process_agent_data(agent_data) processed_data = process_agent_data(agent_data)
# Store the agent_data in the database (you can send it to the data processing module)
if not self.hub_gateway.save_data(processed_data): if self.hub_gateway.save_data(processed_data):
logging.error("Hub is not available") logging.info("Processed data successfully forwarded to the Hub.")
else:
logging.error("Failed to send data: Hub gateway is unavailable.")
except Exception as e: except Exception as e:
logging.info(f"Error processing MQTT message: {e}") logging.error(f"Error processing MQTT message: {e}")
def connect(self): def connect(self):
self.client.on_connect = self.on_connect self.client.on_connect = self.on_connect
self.client.on_message = self.on_message self.client.on_message = self.on_message
self.client.connect(self.broker_host, self.broker_port, 60) self.client.connect(self.broker_host, self.broker_port, 60)
def start(self): def loop_forever(self):
self.client.loop_start() self.client.loop_forever()
def stop(self):
self.client.loop_stop()
# Usage example:
if __name__ == "__main__":
broker_host = "localhost"
broker_port = 1883
topic = "agent_data_topic"
# Assuming you have implemented the StoreGateway and passed it to the adapter
store_gateway = HubGateway()
adapter = AgentMQTTAdapter(broker_host, broker_port, topic, store_gateway)
adapter.connect()
adapter.start()
try:
# Keep the adapter running in the background
while True:
pass
except KeyboardInterrupt:
adapter.stop()
logging.info("Adapter stopped.")
+1
View File
@@ -14,6 +14,7 @@ class GpsData(BaseModel):
class AgentData(BaseModel): class AgentData(BaseModel):
user_id: int
accelerometer: AccelerometerData accelerometer: AccelerometerData
gps: GpsData gps: GpsData
timestamp: datetime timestamp: datetime
+2 -9
View File
@@ -26,15 +26,8 @@ class AgentGateway(ABC):
pass pass
@abstractmethod @abstractmethod
def start(self): def loop_forever(self):
""" """
Method to start listening for messages from the agent. Method to await for new messages.
"""
pass
@abstractmethod
def stop(self):
"""
Method to stop the agent gateway and clean up resources.
""" """
pass pass
+22 -1
View File
@@ -1,6 +1,7 @@
from app.entities.agent_data import AgentData from app.entities.agent_data import AgentData
from app.entities.processed_agent_data import ProcessedAgentData from app.entities.processed_agent_data import ProcessedAgentData
_last_detection_state = {}
def process_agent_data( def process_agent_data(
agent_data: AgentData, agent_data: AgentData,
@@ -12,4 +13,24 @@ def process_agent_data(
Returns: Returns:
processed_data_batch (ProcessedAgentData): Processed data containing the classified state of the road surface and agent data. processed_data_batch (ProcessedAgentData): Processed data containing the classified state of the road surface and agent data.
""" """
# Implement it user_id = agent_data.user_id
road_state = "normal"
last_detection_state = _last_detection_state.get(user_id, False)
if (agent_data.accelerometer.z < 0.6):
road_state = "pothole"
elif (agent_data.accelerometer.z > 1.2):
road_state = "bump"
detection_happened = road_state != "normal"
if not (not last_detection_state and detection_happened):
road_state = "normal"
_last_detection_state[user_id] = detection_happened
return ProcessedAgentData(
road_state=road_state,
agent_data=agent_data
)
+5 -2
View File
@@ -16,9 +16,12 @@ MQTT_TOPIC = os.environ.get("MQTT_TOPIC") or "agent_data_topic"
# Configuration for hub MQTT # Configuration for hub MQTT
HUB_MQTT_BROKER_HOST = os.environ.get("HUB_MQTT_BROKER_HOST") or "localhost" HUB_MQTT_BROKER_HOST = os.environ.get("HUB_MQTT_BROKER_HOST") or "localhost"
HUB_MQTT_BROKER_PORT = try_parse_int(os.environ.get("HUB_MQTT_BROKER_PORT")) or 1883 HUB_MQTT_BROKER_PORT = try_parse_int(os.environ.get("HUB_MQTT_BROKER_PORT")) or 1883
HUB_MQTT_TOPIC = os.environ.get("HUB_MQTT_TOPIC") or "processed_agent_data_topic" HUB_MQTT_TOPIC = os.environ.get("HUB_MQTT_TOPIC") or "processed_data_topic"
# Configuration for the Hub # Configuration for the Hub
HUB_HOST = os.environ.get("HUB_HOST") or "localhost" HUB_HOST = os.environ.get("HUB_HOST") or "localhost"
HUB_PORT = try_parse_int(os.environ.get("HUB_PORT")) or 12000 HUB_PORT = try_parse_int(os.environ.get("HUB_PORT")) or 8000
HUB_URL = f"http://{HUB_HOST}:{HUB_PORT}" HUB_URL = f"http://{HUB_HOST}:{HUB_PORT}"
# For choosing type of connection
HUB_CONNECTION_TYPE = os.environ.get("HUB_CONNECTION_TYPE") or "mqtt"
-48
View File
@@ -1,48 +0,0 @@
version: "3.9"
# name: "road_vision"
services:
mqtt:
image: eclipse-mosquitto
container_name: mqtt
volumes:
- ./mosquitto:/mosquitto
- ./mosquitto/data:/mosquitto/data
- ./mosquitto/log:/mosquitto/log
ports:
- 1883:1883
- 19001:9001
networks:
mqtt_network:
edge:
container_name: edge
build: ../
depends_on:
- mqtt
environment:
MQTT_BROKER_HOST: "mqtt"
MQTT_BROKER_PORT: 1883
MQTT_TOPIC: " "
HUB_HOST: "store"
HUB_PORT: 8000
HUB_MQTT_BROKER_HOST: "mqtt"
HUB_MQTT_BROKER_PORT: 1883
HUB_MQTT_TOPIC: "processed_data_topic"
networks:
mqtt_network:
edge_hub:
networks:
mqtt_network:
db_network:
edge_hub:
hub:
hub_store:
hub_redis:
volumes:
postgres_data:
pgadmin-data:
+25 -16
View File
@@ -10,42 +10,51 @@ from config import (
HUB_MQTT_BROKER_HOST, HUB_MQTT_BROKER_HOST,
HUB_MQTT_BROKER_PORT, HUB_MQTT_BROKER_PORT,
HUB_MQTT_TOPIC, HUB_MQTT_TOPIC,
HUB_CONNECTION_TYPE,
) )
if __name__ == "__main__": if __name__ == "__main__":
# Configure logging settings # Configure logging settings
logging.basicConfig( logging.basicConfig(
level=logging.INFO, # Set the log level to INFO (you can use logging.DEBUG for more detailed logs) level=logging.INFO,
format="[%(asctime)s] [%(levelname)s] [%(module)s] %(message)s", format="[%(asctime)s] [%(levelname)s] [%(module)s] %(message)s",
handlers=[ handlers=[
logging.StreamHandler(), # Output log messages to the console logging.StreamHandler(),
logging.FileHandler("app.log"), # Save log messages to a file logging.FileHandler("app.log"),
], ],
) )
# Create an instance of the StoreApiAdapter using the configuration
# hub_adapter = HubHttpAdapter( # Logic to select the adapter based on configuration (SCRUM-93 & SCRUM-94)
# api_base_url=HUB_URL, # This allows easy switching between HTTP and MQTT protocols
# ) if HUB_CONNECTION_TYPE.lower() == "http":
logging.info("Initializing HubHttpAdapter (SCRUM-93 integration)")
hub_adapter = HubHttpAdapter(
api_base_url=HUB_URL,
)
else:
logging.info("Initializing HubMqttAdapter (SCRUM-94 integration)")
hub_adapter = HubMqttAdapter( hub_adapter = HubMqttAdapter(
broker=HUB_MQTT_BROKER_HOST, broker=HUB_MQTT_BROKER_HOST,
port=HUB_MQTT_BROKER_PORT, port=HUB_MQTT_BROKER_PORT,
topic=HUB_MQTT_TOPIC, topic=HUB_MQTT_TOPIC,
) )
# Create an instance of the AgentMQTTAdapter using the configuration
# Create an instance of the AgentMQTTAdapter using the selected hub adapter
# This adapter acts as a bridge between the Agent and the Hub
agent_adapter = AgentMQTTAdapter( agent_adapter = AgentMQTTAdapter(
broker_host=MQTT_BROKER_HOST, broker_host=MQTT_BROKER_HOST,
broker_port=MQTT_BROKER_PORT, broker_port=MQTT_BROKER_PORT,
topic=MQTT_TOPIC, topic=MQTT_TOPIC,
hub_gateway=hub_adapter, hub_gateway=hub_adapter,
) )
try: try:
# Connect to the MQTT broker and start listening for messages logging.info(f"Connecting to MQTT broker at {MQTT_BROKER_HOST}:{MQTT_BROKER_PORT}")
agent_adapter.connect() agent_adapter.connect()
agent_adapter.start()
# Keep the system running indefinitely (you can add other logic as needed) logging.info("Broker connection success. Waiting for data...")
while True: agent_adapter.loop_forever()
pass
except KeyboardInterrupt: except KeyboardInterrupt:
# Stop the MQTT adapter and exit gracefully if interrupted by the user logging.info("Interrupt signal received. Shutting down...")
agent_adapter.stop() agent_adapter.disconnect()
logging.info("System stopped.") logging.info("Disconnected from MQTT broker.")
-2
View File
@@ -1,2 +0,0 @@
venv
__pycache__
+3 -2
View File
@@ -3,9 +3,10 @@ FROM python:3.9-slim
# Set the working directory inside the container # Set the working directory inside the container
WORKDIR /app WORKDIR /app
# Copy the requirements.txt file and install dependencies # Copy the requirements.txt file and install dependencies
COPY requirements.txt . COPY hub/requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt RUN pip install --no-cache-dir -r requirements.txt
# Copy the entire application into the container # Copy the entire application into the container
COPY . . COPY hub/. .
# Run the main.py script inside the container when it starts # Run the main.py script inside the container when it starts
CMD ["uvicorn", "main:app", "--host", "0.0.0.0"] CMD ["uvicorn", "main:app", "--host", "0.0.0.0"]
+12
View File
@@ -0,0 +1,12 @@
# Use the official Python image as the base image
FROM python:3.9-slim
# Set the working directory inside the container
WORKDIR /app
# Copy the requirements.txt file and install dependencies
COPY hub/requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy the entire application into the container
COPY hub/. .
# Run the main.py script inside the container when it starts
CMD ["./test-entry.sh"]
+35 -8
View File
@@ -13,12 +13,39 @@ class StoreApiAdapter(StoreGateway):
def __init__(self, api_base_url): def __init__(self, api_base_url):
self.api_base_url = api_base_url self.api_base_url = api_base_url
def processed_agent_data_batch_to_payload(self, processed_agent_data_batch: List[ProcessedAgentData]):
if not processed_agent_data_batch:
return False
# Extract user_id from the first element
user_id = processed_agent_data_batch[0].agent_data.user_id
payload = {
"data": [item.model_dump(mode='json') for item in processed_agent_data_batch],
"user_id": user_id
}
return payload
def save_data(self, processed_agent_data_batch: List[ProcessedAgentData]): def save_data(self, processed_agent_data_batch: List[ProcessedAgentData]):
""" payload = self.processed_agent_data_batch_to_payload(processed_agent_data_batch)
Save the processed road data to the Store API.
Parameters: if payload == False:
processed_agent_data_batch (dict): Processed road data to be saved. return False
Returns:
bool: True if the data is successfully saved, False otherwise. try:
""" # Perform a POST request to the Store API with a 10-second timeout
# Implement it response = requests.post(
f"{self.api_base_url}/processed_agent_data/",
json=payload,
timeout=10
)
if response.status_code == 200:
logging.info(f"Batch of {len(processed_agent_data_batch)} items sent to Store.")
return True
else:
logging.error(f"Store API error: {response.status_code} - {response.text}")
return False
except Exception as e:
logging.error(f"Failed to send data to Store: {e}")
return False
@@ -0,0 +1,41 @@
from app.adapters.store_api_adapter import StoreApiAdapter
from app.entities.agent_data import AccelerometerData, AgentData, GpsData
from app.entities.processed_agent_data import ProcessedAgentData
def _test_processed_agent_data_batch_to_payload():
processed_data_batch = [
ProcessedAgentData(road_state = "normal",
agent_data = AgentData(user_id = 1,
accelerometer = AccelerometerData(x = 0.1, y = 0.2, z = 0.3),
gps = GpsData(latitude = 10.123, longitude = 20.456),
timestamp = "2023-07-21T12:34:56Z")
),
ProcessedAgentData(road_state = "normal",
agent_data = AgentData(user_id = 2,
accelerometer = AccelerometerData(x = 0.1, y = 0.2, z = 0.3),
gps = GpsData(latitude = 10.123, longitude = 20.456),
timestamp = "2023-07-21T12:34:56Z")
),
ProcessedAgentData(road_state = "normal",
agent_data = AgentData(user_id = 3,
accelerometer = AccelerometerData(x = 0.1, y = 0.2, z = 0.3),
gps = GpsData(latitude = 10.123, longitude = 20.456),
timestamp = "2023-07-21T12:34:56Z")
),
]
res = StoreApiAdapter(None).processed_agent_data_batch_to_payload(processed_data_batch)
assert res["data"][0]["agent_data"]["user_id"] == 1
assert res["data"][1]["agent_data"]["user_id"] == 2
assert res["data"][2]["agent_data"]["user_id"] == 3
assert StoreApiAdapter(None).processed_agent_data_batch_to_payload([]) == False
if __name__ == "__main__":
test_functions = [i for i in dir() if i.startswith('_test_')]
for i in test_functions:
print(i)
eval(i)()
+8 -6
View File
@@ -70,18 +70,20 @@ def on_message(client, userdata, msg):
processed_agent_data = ProcessedAgentData.model_validate_json( processed_agent_data = ProcessedAgentData.model_validate_json(
payload, strict=True payload, strict=True
) )
redis_client.lpush( redis_client.lpush(
"processed_agent_data", processed_agent_data.model_dump_json() "processed_agent_data", processed_agent_data.model_dump_json()
) )
processed_agent_data_batch: List[ProcessedAgentData] = []
if redis_client.llen("processed_agent_data") >= BATCH_SIZE: if redis_client.llen("processed_agent_data") >= BATCH_SIZE:
processed_agent_data_batch: List[ProcessedAgentData] = []
for _ in range(BATCH_SIZE): for _ in range(BATCH_SIZE):
processed_agent_data = ProcessedAgentData.model_validate_json( raw_data = redis_client.lpop("processed_agent_data")
redis_client.lpop("processed_agent_data") if raw_data:
) data_item = ProcessedAgentData.model_validate_json(raw_data)
processed_agent_data_batch.append(processed_agent_data) processed_agent_data_batch.append(data_item)
store_adapter.save_data(processed_agent_data_batch=processed_agent_data_batch) store_adapter.save_data(processed_agent_data_batch=processed_agent_data_batch)
return {"status": "ok"} return {"status": "ok"}
except Exception as e: except Exception as e:
logging.info(f"Error processing MQTT message: {e}") logging.info(f"Error processing MQTT message: {e}")
+3
View File
@@ -0,0 +1,3 @@
#!/bin/sh
PYTHONPATH=$PWD python3 app/adapters/store_api_adapter_test.py
-3
View File
@@ -1,3 +0,0 @@
venv
__pycache__
.idea
+3 -2
View File
@@ -3,9 +3,10 @@ FROM python:latest
# Set the working directory inside the container # Set the working directory inside the container
WORKDIR /app WORKDIR /app
# Copy the requirements.txt file and install dependencies # Copy the requirements.txt file and install dependencies
COPY requirements.txt . COPY store/requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt RUN pip install --no-cache-dir -r requirements.txt
# Copy the entire application into the container # Copy the entire application into the container
COPY . . COPY store/. .
# Run the main.py script inside the container when it starts # Run the main.py script inside the container when it starts
CMD ["uvicorn", "main:app", "--host", "0.0.0.0"] CMD ["uvicorn", "main:app", "--host", "0.0.0.0"]
+13
View File
@@ -0,0 +1,13 @@
# Use the official Python image as the base image
FROM python:latest
# Set the working directory inside the container
WORKDIR /app
# Copy the requirements.txt file and install dependencies
COPY store/requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy the entire application into the container
COPY store/. .
# Run the main.py script inside the container when it starts
#CMD ["uvicorn", "main:app", "--host", "0.0.0.0"]
CMD ["./test-entry.sh"]
View File
+15
View File
@@ -0,0 +1,15 @@
from sqlalchemy import MetaData
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, declarative_base
from config import POSTGRES_USER, POSTGRES_PASSWORD, POSTGRES_HOST, POSTGRES_PORT, POSTGRES_DB
DATABASE_URL = f"postgresql+psycopg2://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{POSTGRES_HOST}:{POSTGRES_PORT}/{POSTGRES_DB}"
engine = create_engine(DATABASE_URL)
Base = declarative_base()
metadata = MetaData()
SessionLocal = sessionmaker(bind=engine)
-60
View File
@@ -1,60 +0,0 @@
version: "3.9"
name: "road_vision__database"
services:
postgres_db:
image: postgres:latest
container_name: postgres_db
restart: always
environment:
POSTGRES_USER: user
POSTGRES_PASSWORD: pass
POSTGRES_DB: test_db
volumes:
- postgres_data:/var/lib/postgresql/data
- ./db/structure.sql:/docker-entrypoint-initdb.d/structure.sql
ports:
- "5432:5432"
networks:
db_network:
pgadmin:
container_name: pgadmin4
image: dpage/pgadmin4
restart: always
environment:
PGADMIN_DEFAULT_EMAIL: admin@admin.com
PGADMIN_DEFAULT_PASSWORD: root
volumes:
- pgadmin-data:/var/lib/pgadmin
ports:
- "5050:80"
networks:
db_network:
store:
container_name: store
build: ..
depends_on:
- postgres_db
restart: always
environment:
POSTGRES_USER: user
POSTGRES_PASSWORD: pass
POSTGRES_DB: test_db
POSTGRES_HOST: postgres_db
POSTGRES_PORT: 5432
ports:
- "8000:8000"
networks:
db_network:
networks:
db_network:
volumes:
postgres_data:
pgadmin-data:
+139 -89
View File
@@ -1,10 +1,8 @@
import asyncio
import json import json
from typing import Set, Dict, List, Any from typing import Set, Dict, List
from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect, Body from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect, Body
from fastapi.encoders import jsonable_encoder
from sqlalchemy import ( from sqlalchemy import (
create_engine,
MetaData,
Table, Table,
Column, Column,
Integer, Integer,
@@ -12,25 +10,14 @@ from sqlalchemy import (
Float, Float,
DateTime, DateTime,
) )
from sqlalchemy.orm import sessionmaker
from sqlalchemy.sql import select from sqlalchemy.sql import select
from datetime import datetime
from pydantic import BaseModel, field_validator from database import metadata, SessionLocal
from config import ( from schemas import ProcessedAgentData, ProcessedAgentDataInDB
POSTGRES_HOST,
POSTGRES_PORT,
POSTGRES_DB,
POSTGRES_USER,
POSTGRES_PASSWORD,
)
# FastAPI app setup # FastAPI app setup
app = FastAPI() app = FastAPI()
# SQLAlchemy setup
DATABASE_URL = f"postgresql+psycopg2://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{POSTGRES_HOST}:{POSTGRES_PORT}/{POSTGRES_DB}"
engine = create_engine(DATABASE_URL)
metadata = MetaData()
# Define the ProcessedAgentData table
processed_agent_data = Table( processed_agent_data = Table(
"processed_agent_data", "processed_agent_data",
metadata, metadata,
@@ -44,91 +31,80 @@ processed_agent_data = Table(
Column("longitude", Float), Column("longitude", Float),
Column("timestamp", DateTime), Column("timestamp", DateTime),
) )
SessionLocal = sessionmaker(bind=engine)
# SQLAlchemy model
class ProcessedAgentDataInDB(BaseModel):
id: int
road_state: str
user_id: int
x: float
y: float
z: float
latitude: float
longitude: float
timestamp: datetime
# FastAPI models
class AccelerometerData(BaseModel):
x: float
y: float
z: float
class GpsData(BaseModel):
latitude: float
longitude: float
class AgentData(BaseModel):
user_id: int
accelerometer: AccelerometerData
gps: GpsData
timestamp: datetime
@classmethod
@field_validator("timestamp", mode="before")
def check_timestamp(cls, value):
if isinstance(value, datetime):
return value
try:
return datetime.fromisoformat(value)
except (TypeError, ValueError):
raise ValueError(
"Invalid timestamp format. Expected ISO 8601 format (YYYY-MM-DDTHH:MM:SSZ)."
)
class ProcessedAgentData(BaseModel):
road_state: str
agent_data: AgentData
# WebSocket subscriptions # WebSocket subscriptions
subscriptions: Dict[int, Set[WebSocket]] = {} subscriptions: Set[WebSocket] = set()
# FastAPI WebSocket endpoint # FastAPI WebSocket endpoint
@app.websocket("/ws/{user_id}") @app.websocket("/ws/{user_id}")
async def websocket_endpoint(websocket: WebSocket, user_id: int): async def websocket_endpoint(websocket: WebSocket, user_id: int):
await websocket.accept() await websocket.accept()
if user_id not in subscriptions:
subscriptions[user_id] = set() subscriptions.add(websocket)
subscriptions[user_id].add(websocket)
try: try:
# send already available data
r = processed_agent_data.select()
stored_data = SessionLocal().execute(r).fetchall()
jsonable_data = [{c.name: getattr(i, c.name) for c in processed_agent_data.columns} for i in stored_data]
for i in jsonable_data:
i['timestamp'] = i['timestamp'].strftime("%Y-%m-%dT%H:%M:%SZ")
for i in jsonable_data:
await websocket.send_json(json.dumps([i]))
# receive forever
while True: while True:
await websocket.receive_text() await websocket.receive_text()
except WebSocketDisconnect: except WebSocketDisconnect:
subscriptions[user_id].remove(websocket) subscriptions.remove(websocket)
# Function to send data to subscribed users # Function to send data to subscribed users
async def send_data_to_subscribers(user_id: int, data): async def send_data_to_subscribers(data):
if user_id in subscriptions: for websocket in subscriptions:
for websocket in subscriptions[user_id]: await websocket.send_json(json.dumps([data]))
await websocket.send_json(json.dumps(data))
# FastAPI CRUDL endpoints # FastAPI CRUDL endpoints
def ProcessedAgentData_to_td(data: List[ProcessedAgentData]):
return [
{
"road_state": item.road_state,
"user_id": item.agent_data.user_id,
"x": item.agent_data.accelerometer.x,
"y": item.agent_data.accelerometer.y,
"z": item.agent_data.accelerometer.z,
"latitude": item.agent_data.gps.latitude,
"longitude": item.agent_data.gps.longitude,
"timestamp": item.agent_data.timestamp,
}
for item in data
]
@app.post("/processed_agent_data/") @app.post("/processed_agent_data/")
async def create_processed_agent_data(data: List[ProcessedAgentData]): async def create_processed_agent_data(data: List[ProcessedAgentData], user_id: int = Body(..., embed=True)):
# Insert data to database session = SessionLocal()
# Send data to subscribers try:
pass created_data = ProcessedAgentData_to_td(data)
stmt = processed_agent_data.insert().values(created_data).returning(processed_agent_data)
result = session.execute(stmt)
created_records = [dict(row._mapping) for row in result.fetchall()]
session.commit()
for record in sorted(created_records, key = lambda x: x['timestamp']):
await send_data_to_subscribers(jsonable_encoder(record))
return created_records
except Exception as err:
session.rollback()
print(f"Database error: {err}")
raise HTTPException(status_code=500, detail="Internal Server Error")
finally:
session.close()
@app.get( @app.get(
@@ -136,14 +112,34 @@ async def create_processed_agent_data(data: List[ProcessedAgentData]):
response_model=ProcessedAgentDataInDB, response_model=ProcessedAgentDataInDB,
) )
def read_processed_agent_data(processed_agent_data_id: int): def read_processed_agent_data(processed_agent_data_id: int):
# Get data by id session = SessionLocal()
pass try:
stmt = select(processed_agent_data).where(
processed_agent_data.c.id == processed_agent_data_id
)
res = session.execute(stmt).fetchone()
if not res:
raise HTTPException(status_code=404, detail="Not found")
return dict(res._mapping)
finally:
session.close()
@app.get("/processed_agent_data/", response_model=list[ProcessedAgentDataInDB]) @app.get("/processed_agent_data/", response_model=list[ProcessedAgentDataInDB])
def list_processed_agent_data(): def list_processed_agent_data():
# Get list of data session = SessionLocal()
pass try:
stmt = select(processed_agent_data)
res = session.execute(stmt).fetchall()
if not res:
raise HTTPException(status_code=404, detail="Not found")
return [dict(r._mapping) for r in res]
finally:
session.close()
@app.put( @app.put(
@@ -152,7 +148,41 @@ def list_processed_agent_data():
) )
def update_processed_agent_data(processed_agent_data_id: int, data: ProcessedAgentData): def update_processed_agent_data(processed_agent_data_id: int, data: ProcessedAgentData):
# Update data # Update data
pass session = SessionLocal()
try:
query = select(processed_agent_data).where(
processed_agent_data.c.id == processed_agent_data_id
)
result = session.execute(query).fetchone()
if not result:
raise HTTPException(status_code=404, detail="Data not found")
update_query = (
processed_agent_data.update()
.where(processed_agent_data.c.id == processed_agent_data_id)
.values(
road_state=data.road_state,
user_id=data.agent_data.user_id,
x=data.agent_data.accelerometer.x,
y=data.agent_data.accelerometer.y,
z=data.agent_data.accelerometer.z,
latitude=data.agent_data.gps.latitude,
longitude=data.agent_data.gps.longitude,
timestamp=data.agent_data.timestamp,
)
)
session.execute(update_query)
session.commit()
updated_result = session.execute(query).fetchone()
return ProcessedAgentDataInDB(**updated_result._mapping)
finally:
session.close()
@app.delete( @app.delete(
@@ -161,8 +191,28 @@ def update_processed_agent_data(processed_agent_data_id: int, data: ProcessedAge
) )
def delete_processed_agent_data(processed_agent_data_id: int): def delete_processed_agent_data(processed_agent_data_id: int):
# Delete by id # Delete by id
pass session = SessionLocal()
try:
query = select(processed_agent_data).where(
processed_agent_data.c.id == processed_agent_data_id
)
result = session.execute(query).fetchone()
if not result:
raise HTTPException(status_code=404, detail="Data not found")
delete_query = processed_agent_data.delete().where(
processed_agent_data.c.id == processed_agent_data_id
)
session.execute(delete_query)
session.commit()
return ProcessedAgentDataInDB(**result._mapping)
finally:
session.close()
if __name__ == "__main__": if __name__ == "__main__":
import uvicorn import uvicorn
Binary file not shown.
+51
View File
@@ -0,0 +1,51 @@
from datetime import datetime
from pydantic import BaseModel, field_validator
class ProcessedAgentDataInDB(BaseModel):
id: int
road_state: str
user_id: int
x: float
y: float
z: float
latitude: float
longitude: float
timestamp: datetime
# FastAPI models
class AccelerometerData(BaseModel):
x: float
y: float
z: float
class GpsData(BaseModel):
latitude: float
longitude: float
class AgentData(BaseModel):
user_id: int
accelerometer: AccelerometerData
gps: GpsData
timestamp: datetime
@classmethod
@field_validator("timestamp", mode="before")
def check_timestamp(cls, value):
if isinstance(value, datetime):
return value
try:
return datetime.fromisoformat(value)
except (TypeError, ValueError):
raise ValueError(
"Invalid timestamp format. Expected ISO 8601 format (YYYY-MM-DDTHH:MM:SSZ)."
)
class ProcessedAgentData(BaseModel):
road_state: str
agent_data: AgentData
+3
View File
@@ -0,0 +1,3 @@
#!/bin/sh
PYTHONPATH=$PWD python3 test/main_test.py
+39
View File
@@ -0,0 +1,39 @@
from schemas import AccelerometerData, AgentData, GpsData, ProcessedAgentData
import main
def _test_ProcessedAgentData_to_td():
processed_data_batch = [
ProcessedAgentData(road_state = "normal",
agent_data = AgentData(user_id = 1,
accelerometer = AccelerometerData(x = 0.1, y = 0.2, z = 0.3),
gps = GpsData(latitude = 10.123, longitude = 20.456),
timestamp = "2023-07-21T12:34:56Z")
),
ProcessedAgentData(road_state = "normal",
agent_data = AgentData(user_id = 2,
accelerometer = AccelerometerData(x = 0.1, y = 0.2, z = 0.3),
gps = GpsData(latitude = 10.123, longitude = 20.456),
timestamp = "2023-07-21T12:34:56Z")
),
ProcessedAgentData(road_state = "normal",
agent_data = AgentData(user_id = 3,
accelerometer = AccelerometerData(x = 0.1, y = 0.2, z = 0.3),
gps = GpsData(latitude = 10.123, longitude = 20.456),
timestamp = "2023-07-21T12:34:56Z")
),
]
res = main.ProcessedAgentData_to_td(processed_data_batch)
assert res[0]["user_id"] == 1
assert res[1]["user_id"] == 2
assert res[2]["user_id"] == 3
if __name__ == "__main__":
test_functions = [i for i in dir() if i.startswith('_test_')]
for i in test_functions:
print(i)
eval(i)()
+23
View File
@@ -0,0 +1,23 @@
import sys
import os
print("Checking for dead containers...")
l = [i for i in sys.stdin.read().split("\n") if i]
header, statuses = l[0], l[1:]
status_index = header.find('STATUS')
name_index = header.find('NAMES')
exit_code = 0
for i in statuses:
if not i[status_index:].startswith("Up "):
service_name = i[name_index:]
print(f"Crash detected in {service_name}")
print(f"docker logs for the container:\n")
os.system(f"docker logs {i.split(' ')[0]}")
print()
exit_code = 1
sys.exit(exit_code)