Compare commits

...

66 Commits

Author SHA1 Message Date
db63eb6d79 [P] Improve logging logic in agent
All checks were successful
Component testing / Hub testing (push) Successful in 24s
Component testing / Store testing (push) Successful in 27s
Component testing / Integration smoke testing (push) Successful in 2m25s
2026-03-25 15:40:07 +02:00
77d6968297 [P] Fix map lat, lon confusion
All checks were successful
Component testing / Hub testing (push) Successful in 23s
Component testing / Store testing (push) Successful in 18s
Component testing / Integration smoke testing (push) Successful in 2m22s
2026-03-25 14:57:38 +02:00
0c2392dc0b Merge remote-tracking branch 'github/dev' into dev
All checks were successful
Component testing / Hub testing (push) Successful in 20s
Component testing / Store testing (push) Successful in 22s
Component testing / Integration smoke testing (push) Successful in 2m18s
2026-03-25 13:12:18 +02:00
VladiusVostokus
65f767d38e Merge pull request #29 from Rhinemann/lab4/yushchenko-SCRUM-81-DataProcessing-implementation
[L4] SCRUM-81: implementation data processing function
2026-03-25 11:11:31 +00:00
0695e3d092 [P] Use state machine approach to determine road condition 2026-03-25 12:16:34 +02:00
AndriiJushchenko
d6e094e6c0 fix: delay logic in DP 2026-03-25 12:16:34 +02:00
AndriiJushchenko
2167eb2960 DP implementation + delay 2026-03-25 12:16:34 +02:00
VladiusVostokus
38374a6723 Merge pull request #28 from Rhinemann/lab4/shmuliar-FIX-01-wrong-acceleration-data-types
[P] Fix acceleration data types
2026-03-25 10:14:10 +00:00
c08612f71a [P] Fix acceleration data types 2026-03-25 10:10:29 +02:00
bde51ca5e1 [P] Fix Store -> MapView websocket incompatibility
All checks were successful
Component testing / Hub testing (push) Successful in 21s
Component testing / Store testing (push) Successful in 20s
Component testing / Integration smoke testing (push) Successful in 2m19s
2026-03-24 16:57:29 +02:00
a204bb1676 [P] Split rows into websocket-sendable messages 2026-03-24 16:38:16 +02:00
764fb77f27 Merge from upstream 'github/dev' into dev
All checks were successful
Component testing / Hub testing (push) Successful in 21s
Component testing / Store testing (push) Successful in 18s
Component testing / Integration smoke testing (push) Successful in 2m14s
2026-03-24 15:21:56 +02:00
VladiusVostokus
a55fc17711 Merge pull request #27 from Rhinemann/lab4/hrynko-SCRUM-80-hubadapter
implement Edge-Hub integration with user_id validation (SCRUM-93, SCRUM-94)
2026-03-24 13:20:39 +00:00
b34e385128 [L4] Remove irrelevant environment variables 2026-03-24 14:42:13 +02:00
a8a0ef5e15 [L4] Remove unused import 2026-03-24 14:09:53 +02:00
00b037a243 [L4] Remove uncessecary environment variable left over after print -> logging usage switch 2026-03-24 14:07:49 +02:00
d1b6c0eed1 [L4] Fix logging level on error message 2026-03-24 14:03:29 +02:00
5e890d4f03 [L4] Remove obvious single code line comments to reduce risk of misleading comments in the future 2026-03-24 14:02:03 +02:00
a8e50d0386 [L4] Remove excessive library import and clean up edge codebase 2026-03-24 13:58:32 +02:00
1b42be264d [L4] Remove misleading batch_size parameter from AgentMQTTAdapter 2026-03-24 13:37:33 +02:00
esk4nz
b12bdc334c fix: improve logging clarity and ensure data delivery in AgentMQTTAdapter 2026-03-23 23:41:25 +02:00
esk4nz
e8ff1c6cbd Replaced busy-wait loop with threading.Event to fix 100% CPU load 2026-03-23 23:28:17 +02:00
esk4nz
ad70519f47 SCRUM-80
Removed manual user_id fallback in data processing
2026-03-23 23:00:19 +02:00
esk4nz
b10aec1020 SCRUM-80
Changed print to logging in agent main
2026-03-23 22:57:15 +02:00
esk4nz
c085a49c8c implement Edge-Hub integration with user_id validation (SCRUM-93, SCRUM-94)
- Agent: Updated config and main
- Edge: Implemented adapter factory in main.py to switch between MQTT and HTTP.
- Edge: Updated AgentData entity and processing logic to support user_id.
- Infrastructure: Configured docker-compose for dynamic protocol switching and environment management.
2026-03-23 21:31:31 +02:00
0b8d2eb18b [P] Add container rebuilding on every rerun
All checks were successful
Component testing / Hub testing (push) Successful in 18s
Component testing / Store testing (push) Successful in 22s
Component testing / Integration smoke testing (push) Successful in 2m33s
2026-03-23 18:31:43 +02:00
2846130e4e [P] Add docker reset workflow
All checks were successful
Component testing / Hub testing (push) Successful in 20s
Component testing / Store testing (push) Successful in 21s
Component testing / Integration smoke testing (push) Successful in 1m34s
2026-03-23 18:26:38 +02:00
30af132033 [P] Add general smoke test and Store incremental test
All checks were successful
Component testing / Hub testing (push) Successful in 22s
Component testing / Store testing (push) Successful in 19s
Component testing / Integration smoke testing (push) Successful in 1m28s
2026-03-23 18:01:41 +02:00
60a846d8b8 [P] Refactor testing code 2026-03-23 16:10:09 +02:00
fe6bb6ab3a [P] Add CI for updated Hub component part
All checks were successful
Hub component testing / hub-test (push) Successful in 25s
2026-03-23 15:53:44 +02:00
ІО-23 Shmuliar Oleh
30f81ec1ae Merge pull request #26 from Rhinemann/lab4/shved-SCRUM-95-test-repo-functionality
set up global docker-compose
2026-03-22 21:59:07 +02:00
1b6f47fa0d [L4] Fix relative paths after file move 2026-03-22 21:13:44 +02:00
b1e6ad7c94 set up global docker-compose 2026-03-22 14:07:29 +01:00
VladiusVostokus
1eddfd966b Merge pull request #24 from Rhinemann/lab5/shmuliar-SCRUM-92-mapview-store-integration
SCRUM 92: mapview store integration
2026-03-14 16:07:30 +00:00
8af68d6dd9 hotfix: index overflow on user_id 2026-03-13 20:47:09 +02:00
63aca15824 add multiuser rendering support 2026-03-13 20:41:16 +02:00
ee509f72a4 pull data in MapView/main.py from actual data source 2026-03-13 19:02:07 +02:00
da9fe69d4e add initial server->client update with all current DB data 2026-03-13 19:01:33 +02:00
1c856dca0e fix MapView/main.py crash due to wrong check condition 2026-03-13 18:58:28 +02:00
VladiusVostokus
17738d07fe Merge pull request #21 from Rhinemann/lab5/gryshaiev-SCRUM-90-set-bump-marker
SCRUM-90: implement set_bump_marker
2026-03-11 17:02:07 +00:00
VladiusVostokus
6b5831ff1b Merge branch 'dev' into lab5/gryshaiev-SCRUM-90-set-bump-marker 2026-03-11 17:01:54 +00:00
VladiusVostokus
54505db70e Merge pull request #23 from Rhinemann/lab5/gryshaiev-SCRUM-89-set-pothole-marker
SCRUM-89: implement set_pothole_marker()
2026-03-11 16:59:35 +00:00
SimonSanich
6f4b3b0ea6 SCRUM-90: implement set_bump_marker 2026-03-11 18:36:40 +02:00
SimonSanich
948a936a1f lab 5: implement set_bump_marker() 2026-03-11 18:10:18 +02:00
esk4nz
87facff668 Merge pull request #16 from Rhinemann/lab3/hrynko-SCRUM-77-post_to_storeAPI_from_hub
SCRUM-77 Post method from Hub to Store
2026-03-09 23:04:16 +02:00
VladiusVostokus
294ed5958e Merge pull request #13 from Rhinemann/lab5/kovalenko-SCRUM-98-FileReader
SCRUM-98 file reader
2026-03-08 15:48:13 +00:00
VladiusVostokus
cbdf81c028 Merge pull request #19 from Rhinemann/lab5/slobodeniuk-SCRUM-84-MapViewApp
Lab5/slobodeniuk scrum 84 map view app
2026-03-08 15:43:27 +00:00
Senya
a98223cbb0 feature: SCRUM-84 add on_start, update, build methods in MapViewMap 2026-03-08 13:32:23 +02:00
Slobodeniuk Sasha
0bd3eaa91d Merge pull request #18 from Rhinemann/dev
Scrum 87 - MapViewApp (check_road_quality, update_car_marker)
2026-03-08 13:08:59 +02:00
VladiusVostokus
9bdb98c19b Merge pull request #17 from Rhinemann/lab5/yushchenko-SCRUM-87-check-road-quality-method
SCRUM-87, 88: add methods update_car_marker and check_road_quality
2026-03-08 11:00:53 +00:00
AndriiJushchenko
c3b71daaeb add check_road_quality method 2026-03-07 17:12:23 +02:00
AndriiJushchenko
31c760f4a8 add method update_car_marker(self, point) 2026-03-07 16:55:20 +02:00
esk4nz
24aeb1a19f changed Batch size to 20
reworked method "on_message"
implemented storeApiAdapter
2026-03-06 00:08:08 +02:00
ІМ-24 Владислав Коваленко
4a81434cb6 feat: handle empty rows 2026-03-03 17:05:53 +00:00
ІО-23 Shmuliar Oleh
ceffcfeac2 Merge pull request #14 from Rhinemann/dev
Commit repository configuration to main branch
2026-03-03 18:44:31 +02:00
VladiusVostokus
312177e087 Merge pull request #12 from Rhinemann/lab3/hrynko-SCRUM-56-repo_set_up
SCRUM-56 Repo setup
2026-03-03 15:24:51 +00:00
ІМ-24 Владислав Коваленко
a52da042ef refactor: ignore spaces in row 2026-03-03 15:17:25 +00:00
ІМ-24 Владислав Коваленко
11c590cf25 feat: start read file again if got to the end of file 2026-03-03 15:11:38 +00:00
ІМ-24 Владислав Коваленко
550d29c48c refactor: move accelerator dataclas to domain folder 2026-03-03 14:59:46 +00:00
ІМ-24 Владислав Коваленко
8a1327b10a fix: remove empty rows 2026-03-03 14:55:44 +00:00
ІМ-24 Владислав Коваленко
db1b7cc6fc feat: read row from file 2026-03-03 14:36:56 +00:00
ІМ-24 Владислав Коваленко
a899ef6a6e feat: get indexes of file header fields 2026-03-03 14:10:18 +00:00
ІМ-24 Владислав Коваленко
95176ea467 feat: close file 2026-03-03 14:04:27 +00:00
ІМ-24 Владислав Коваленко
081a2d4240 fix: file field name 2026-03-03 14:03:53 +00:00
ІМ-24 Владислав Коваленко
92c20ef612 feat: open file 2026-03-03 13:59:40 +00:00
ІМ-24 Владислав Коваленко
c31363aa57 feat: add empty methods to class 2026-03-03 13:46:26 +00:00
32 changed files with 2017 additions and 279 deletions

View File

@@ -0,0 +1,16 @@
name: Reset docker state
on: workflow_dispatch
jobs:
reset:
runs-on: host-arch-x86_64
name: Reset docker state
steps:
- name: Stop all containers
run: docker stop $(docker ps -a | cut -d " " -f 1 | tail -n +2)
- name: Remove all containers
run: docker rm $(docker ps -a | cut -d " " -f 1 | tail -n +2)
- name: Remove extra volumes
run: docker volume rm road_vision_postgres_data road_vision_pgadmin-data

View File

@@ -0,0 +1,71 @@
name: Component testing
on: [push, workflow_dispatch]
jobs:
hub-test:
name: Hub testing
runs-on: host-arch-x86_64
steps:
- name: Clone repository
run: git clone --revision ${{ gitea.sha }} --depth 1 ${{ gitea.server_url }}/${{ gitea.repository }}
- name: Build Hub testing container
working-directory: IoT-Systems
run: docker build -t local/hub/${{gitea.sha}} -f hub/Dockerfile-test .
- name: Run Hub tests
working-directory: IoT-Systems
run: docker run --rm -it local/hub/${{gitea.sha}}
- name: Clean up containers
if: ${{always()}}
run: docker image rm local/hub/${{gitea.sha}}
store-test:
name: Store testing
runs-on: host-arch-x86_64
steps:
- name: Clone repository
run: git clone --revision ${{ gitea.sha }} --depth 1 ${{ gitea.server_url }}/${{ gitea.repository }}
- name: Build Store testing container
working-directory: IoT-Systems
run: docker build -t local/store/${{gitea.sha}} -f store/Dockerfile-test .
- name: Run Store tests
working-directory: IoT-Systems
run: docker run --rm -it local/store/${{gitea.sha}}
- name: Clean up containers
if: ${{always()}}
run: docker image rm local/store/${{gitea.sha}}
integration-smoke-test:
name: Integration smoke testing
runs-on: host-arch-x86_64
needs:
- hub-test
- store-test
steps:
- name: Clone repository
run: git clone --revision ${{ gitea.sha }} --depth 1 ${{ gitea.server_url }}/${{ gitea.repository }}
- name: Build all production containers
working-directory: IoT-Systems
run: docker-compose build
- name: Start all production containers
working-directory: IoT-Systems
run: docker-compose up -d
- name: Wait for crashes to happen
run: sleep 30
- name: Check for dead containers
working-directory: IoT-Systems
run: docker ps -a | python3 utils/check-up.py
- name: Clean up
if: ${{always()}}
working-directory: IoT-Systems
run: docker-compose down -v

File diff suppressed because one or more lines are too long

View File

@@ -72,9 +72,10 @@ class Datasource:
)
new_points = [
(
processed_agent_data.latitude,
processed_agent_data.longitude,
processed_agent_data.latitude,
processed_agent_data.road_state,
processed_agent_data.user_id
)
for processed_agent_data in processed_agent_data_list
]

View File

@@ -0,0 +1,7 @@
from dataclasses import dataclass
@dataclass
class Accelerometer:
x: float
y: float
z: float

50
MapView/fileReader.py Normal file
View File

@@ -0,0 +1,50 @@
from csv import reader
import config
from domain.accelerometer import Accelerometer
class FileReader:
def __init__(
self, data_filename: str,
) -> None:
self.file_path = data_filename
pass
def read(self):
return self.getNextValue()
def startReading(self, *args, **kwargs):
self.file = open(self.file_path, newline='')
self.file_reader = reader(self.file, skipinitialspace=True)
file_header = next(self.file_reader)
self.x_idx = file_header.index('X')
self.y_idx = file_header.index('Y')
self.z_idx = file_header.index('Z')
def getNextValue(self):
while True:
row = next(self.file_reader, None)
if row is None:
self._rewind_file()
continue
try:
x = int(row[self.x_idx])
y = int(row[self.y_idx])
z = int(row[self.z_idx])
return Accelerometer(x=x, y=y, z=z)
except Exception as e:
continue
def _rewind_file(self):
self.file.seek(0)
self.file_reader = reader(self.file)
next(self.file_reader)
def stopReading(self, *args, **kwargs):
if self.file:
self.file.close()
self.file_reader = None

View File

@@ -5,47 +5,141 @@ from kivy.clock import Clock
from lineMapLayer import LineMapLayer
from datasource import Datasource
line_layer_colors = [
[1, 0, 0, 1],
[1, 0.5, 0, 1],
[0, 1, 0, 1],
[0, 1, 1, 1],
[0, 0, 1, 1],
[1, 0, 1, 1],
]
class MapViewApp(App):
def __init__(self, **kwargs):
super().__init__()
super().__init__(**kwargs)
self.mapview = None
self.datasource = Datasource(user_id=1)
self.line_layers = dict()
self.car_markers = dict()
# додати необхідні змінні
self.bump_markers = []
self.pothole_markers = []
def on_start(self):
"""
Встановлює необхідні маркери, викликає функцію для оновлення мапи
"""
self.update()
Clock.schedule_interval(self.update, 0.1)
def update(self, *args):
"""
Викликається регулярно для оновлення мапи
"""
new_points = self.datasource.get_new_points()
if not new_points:
return
def update_car_marker(self, point):
for point in new_points:
lat, lon, road_state, user_id = point
# Оновлює лінію маршрута
if user_id not in self.line_layers:
self.line_layers[user_id] = LineMapLayer(color = line_layer_colors[user_id % len(line_layer_colors)])
self.mapview.add_layer(self.line_layers[user_id])
self.line_layers[user_id].add_point((lat, lon))
# Оновлює маркер маниши
self.update_car_marker(lat, lon, user_id)
# Перевіряємо стан дороги
self.check_road_quality(point)
def check_road_quality(self, point):
"""
Аналізує дані акселерометра для подальшого визначення
та відображення ям та лежачих поліцейських
"""
if len(point) < 3:
return
lat, lon, road_state, user_id = point
if road_state == "pothole":
self.set_pothole_marker((lat, lon))
elif road_state == "bump":
self.set_bump_marker((lat, lon))
def update_car_marker(self, lat, lon, user_id):
"""
Оновлює відображення маркера машини на мапі
:param point: GPS координати
"""
if user_id not in self.car_markers:
self.car_markers[user_id] = MapMarker(lat=lat, lon=lon, source='./images/car.png')
self.mapview.add_marker(self.car_markers[user_id])
else:
self.car_markers[user_id].lat = lat
self.car_markers[user_id].lon = lon
if user_id == 1:
self.mapview.center_on(lat, lon)
def set_pothole_marker(self, point):
"""
Встановлює маркер для ями
:param point: GPS координати
"""
if isinstance(point, dict):
lat = point.get("lat")
lon = point.get("lon")
else:
lat, lon = point
if lat is None or lon is None:
return
marker = MapMarker(
lat=lat,
lon=lon,
source="images/pothole.png"
)
self.mapview.add_marker(marker)
self.pothole_markers.append(marker)
def set_bump_marker(self, point):
"""
Встановлює маркер для лежачого поліцейського
:param point: GPS координати
"""
if isinstance(point, dict):
lat = point.get("lat")
lon = point.get("lon")
else:
lat, lon = point
if lat is None or lon is None:
return
marker = MapMarker(
lat=lat,
lon=lon,
source="images/bump.png"
)
self.mapview.add_marker(marker)
self.bump_markers.append(marker)
def build(self):
"""
Ініціалізує мапу MapView(zoom, lat, lon)
:return: мапу
"""
self.mapview = MapView()
self.mapview = MapView(
zoom=15,
lat=50.4501,
lon=30.5234
)
return self.mapview

View File

@@ -1,34 +0,0 @@
name: "road_vision"
services:
mqtt:
image: eclipse-mosquitto
container_name: mqtt
volumes:
- ./mosquitto:/mosquitto
- ./mosquitto/data:/mosquitto/data
- ./mosquitto/log:/mosquitto/log
ports:
- 1883:1883
- 9001:9001
networks:
mqtt_network:
fake_agent:
container_name: agent
build:
context: ../../
dockerfile: agent/Dockerfile
depends_on:
- mqtt
environment:
MQTT_BROKER_HOST: "mqtt"
MQTT_BROKER_PORT: 1883
MQTT_TOPIC: "agent_data_topic"
DELAY: 0.1
networks:
mqtt_network:
networks:
mqtt_network:

View File

@@ -8,7 +8,7 @@ def try_parse(type, value: str):
return None
USER_ID = 1
USER_ID = try_parse(int, os.environ.get("USER_ID")) or 1
# MQTT config
MQTT_BROKER_HOST = os.environ.get("MQTT_BROKER_HOST") or "mqtt"
MQTT_BROKER_PORT = try_parse(int, os.environ.get("MQTT_BROKER_PORT")) or 1883

View File

@@ -3,6 +3,6 @@ from dataclasses import dataclass
@dataclass
class Accelerometer:
x: int
y: int
z: int
x: float
y: float
z: float

View File

@@ -15,6 +15,7 @@ class FileDatasource:
def __init__(
self,
acc_divisor: float,
accelerometer_filename: str,
gps_filename: str,
park_filename: str,
@@ -34,6 +35,8 @@ class FileDatasource:
self._started = False
self.acc_divisor = acc_divisor
def startReading(self, *args, **kwargs):
"""Must be called before read()"""
if self._started:
@@ -160,15 +163,14 @@ class FileDatasource:
return row
@staticmethod
def _parse_acc(row: List[str]) -> Accelerometer:
def _parse_acc(self, row: List[str]) -> Accelerometer:
if len(row) < 3:
raise ValueError(f"Accelerometer row must have 3 values (x,y,z). Got: {row}")
try:
x = int(row[0])
y = int(row[1])
z = int(row[2])
x = int(row[0]) / self.acc_divisor
y = int(row[1]) / self.acc_divisor
z = int(row[2]) / self.acc_divisor
except ValueError as e:
raise ValueError(f"Invalid accelerometer values (expected integers): {row}") from e

View File

@@ -1,6 +1,7 @@
from paho.mqtt import client as mqtt_client
from schema.aggregated_data_schema import AggregatedDataSchema
from file_datasource import FileDatasource
import logging
import config
@@ -15,6 +16,8 @@ def connect_mqtt(broker, port):
print("Failed to connect {broker}:{port}, return code %d\n", rc)
exit(rc) # Stop execution
logging.info(f"Acting as USER_ID = {config.USER_ID}")
client = mqtt_client.Client()
client.on_connect = on_connect
client.connect(broker, port)
@@ -28,16 +31,18 @@ def publish(client, topic, datasource):
data = datasource.read()
msg = AggregatedDataSchema().dumps(data)
result = client.publish(topic, msg)
logging.debug(f"Published to {topic}: {msg[:50]}...")
status = result[0]
if status != 0:
print(f"Failed to send message to topic {topic}")
logging.error(f"Failed to send message to topic {topic}")
def run():
logging.basicConfig(level = logging.INFO)
# Prepare mqtt client
client = connect_mqtt(config.MQTT_BROKER_HOST, config.MQTT_BROKER_PORT)
# Prepare datasource
datasource = FileDatasource("data/accelerometer.csv", "data/gps.csv", "data/parking.csv")
datasource = FileDatasource(16384.0, "data/accelerometer.csv", "data/gps.csv", "data/parking.csv")
# Infinity publish data
publish(client, config.MQTT_TOPIC, datasource)

View File

@@ -2,6 +2,6 @@ from marshmallow import Schema, fields
class AccelerometerSchema(Schema):
x = fields.Int()
y = fields.Int()
z = fields.Int()
x = fields.Float()
y = fields.Float()
z = fields.Float()

View File

@@ -1,12 +1,12 @@
name: "road_vision__hub"
name: "road_vision"
services:
mqtt:
image: eclipse-mosquitto
container_name: mqtt
volumes:
- ./mosquitto:/mosquitto
- ./mosquitto/data:/mosquitto/data
- ./mosquitto/log:/mosquitto/log
- ./agent/docker/mosquitto:/mosquitto
- ./agent/docker/mosquitto/data:/mosquitto/data
- ./agent/docker/mosquitto/log:/mosquitto/log
ports:
- 1883:1883
- 9001:9001
@@ -14,6 +14,43 @@ services:
mqtt_network:
fake_agent:
container_name: agent
build:
context: .
dockerfile: agent/Dockerfile
depends_on:
- mqtt
environment:
PYTHONUNBUFFERED: 1
MQTT_BROKER_HOST: "mqtt"
MQTT_BROKER_PORT: 1883
MQTT_TOPIC: "agent_data_topic"
DELAY: 0.1
networks:
mqtt_network:
edge:
container_name: edge
build:
context: .
dockerfile: edge/Dockerfile
depends_on:
- mqtt
environment:
MQTT_BROKER_HOST: "mqtt"
MQTT_BROKER_PORT: 1883
MQTT_TOPIC: "agent_data_topic"
HUB_HOST: "hub"
HUB_PORT: 8000
HUB_CONNECTION_TYPE: "http"
HUB_MQTT_BROKER_HOST: "mqtt"
HUB_MQTT_BROKER_PORT: 1883
HUB_MQTT_TOPIC: "processed_data_topic"
networks:
mqtt_network:
edge_hub:
postgres_db:
image: postgres:17
container_name: postgres_db
@@ -24,13 +61,12 @@ services:
POSTGRES_DB: test_db
volumes:
- postgres_data:/var/lib/postgresql/data
- ./db/structure.sql:/docker-entrypoint-initdb.d/structure.sql
- ./store/docker/db/structure.sql:/docker-entrypoint-initdb.d/structure.sql
ports:
- "5432:5432"
networks:
db_network:
pgadmin:
container_name: pgadmin4
image: dpage/pgadmin4
@@ -49,12 +85,13 @@ services:
store:
container_name: store
build:
context: ../../
context: .
dockerfile: store/Dockerfile
depends_on:
- postgres_db
restart: always
environment:
PYTHONUNBUFFERED: 1
POSTGRES_USER: user
POSTGRES_PASSWORD: pass
POSTGRES_DB: test_db
@@ -79,13 +116,14 @@ services:
hub:
container_name: hub
build:
context: ../../
context: .
dockerfile: hub/Dockerfile
depends_on:
- mqtt
- redis
- store
environment:
PYTHONUNBUFFERED: 1
STORE_API_HOST: "store"
STORE_API_PORT: 8000
REDIS_HOST: "redis"
@@ -93,7 +131,7 @@ services:
MQTT_BROKER_HOST: "mqtt"
MQTT_BROKER_PORT: 1883
MQTT_TOPIC: "processed_data_topic"
BATCH_SIZE: 1
BATCH_SIZE: 20
ports:
- "9000:8000"
networks:
@@ -101,10 +139,11 @@ services:
hub_store:
hub_redis:
networks:
mqtt_network:
db_network:
edge_hub:
hub:
hub_store:
hub_redis:

View File

@@ -13,9 +13,7 @@ class AgentMQTTAdapter(AgentGateway):
broker_port,
topic,
hub_gateway: HubGateway,
batch_size=10,
):
self.batch_size = batch_size
# MQTT
self.broker_host = broker_host
self.broker_port = broker_port
@@ -35,42 +33,21 @@ class AgentMQTTAdapter(AgentGateway):
"""Processing agent data and sent it to hub gateway"""
try:
payload: str = msg.payload.decode("utf-8")
# Create AgentData instance with the received data
agent_data = AgentData.model_validate_json(payload, strict=True)
# Process the received data (you can call a use case here if needed)
processed_data = process_agent_data(agent_data)
# Store the agent_data in the database (you can send it to the data processing module)
if not self.hub_gateway.save_data(processed_data):
logging.error("Hub is not available")
if self.hub_gateway.save_data(processed_data):
logging.info("Processed data successfully forwarded to the Hub.")
else:
logging.error("Failed to send data: Hub gateway is unavailable.")
except Exception as e:
logging.info(f"Error processing MQTT message: {e}")
logging.error(f"Error processing MQTT message: {e}")
def connect(self):
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
self.client.connect(self.broker_host, self.broker_port, 60)
def start(self):
self.client.loop_start()
def stop(self):
self.client.loop_stop()
# Usage example:
if __name__ == "__main__":
broker_host = "localhost"
broker_port = 1883
topic = "agent_data_topic"
# Assuming you have implemented the StoreGateway and passed it to the adapter
store_gateway = HubGateway()
adapter = AgentMQTTAdapter(broker_host, broker_port, topic, store_gateway)
adapter.connect()
adapter.start()
try:
# Keep the adapter running in the background
while True:
pass
except KeyboardInterrupt:
adapter.stop()
logging.info("Adapter stopped.")
def loop_forever(self):
self.client.loop_forever()

View File

@@ -14,6 +14,7 @@ class GpsData(BaseModel):
class AgentData(BaseModel):
user_id: int
accelerometer: AccelerometerData
gps: GpsData
timestamp: datetime

View File

@@ -26,15 +26,8 @@ class AgentGateway(ABC):
pass
@abstractmethod
def start(self):
def loop_forever(self):
"""
Method to start listening for messages from the agent.
"""
pass
@abstractmethod
def stop(self):
"""
Method to stop the agent gateway and clean up resources.
Method to await for new messages.
"""
pass

View File

@@ -1,6 +1,7 @@
from app.entities.agent_data import AgentData
from app.entities.processed_agent_data import ProcessedAgentData
_last_detection_state = {}
def process_agent_data(
agent_data: AgentData,
@@ -12,4 +13,24 @@ def process_agent_data(
Returns:
processed_data_batch (ProcessedAgentData): Processed data containing the classified state of the road surface and agent data.
"""
# Implement it
user_id = agent_data.user_id
road_state = "normal"
last_detection_state = _last_detection_state.get(user_id, False)
if (agent_data.accelerometer.z < 0.6):
road_state = "pothole"
elif (agent_data.accelerometer.z > 1.2):
road_state = "bump"
detection_happened = road_state != "normal"
if not (not last_detection_state and detection_happened):
road_state = "normal"
_last_detection_state[user_id] = detection_happened
return ProcessedAgentData(
road_state=road_state,
agent_data=agent_data
)

View File

@@ -16,9 +16,12 @@ MQTT_TOPIC = os.environ.get("MQTT_TOPIC") or "agent_data_topic"
# Configuration for hub MQTT
HUB_MQTT_BROKER_HOST = os.environ.get("HUB_MQTT_BROKER_HOST") or "localhost"
HUB_MQTT_BROKER_PORT = try_parse_int(os.environ.get("HUB_MQTT_BROKER_PORT")) or 1883
HUB_MQTT_TOPIC = os.environ.get("HUB_MQTT_TOPIC") or "processed_agent_data_topic"
HUB_MQTT_TOPIC = os.environ.get("HUB_MQTT_TOPIC") or "processed_data_topic"
# Configuration for the Hub
HUB_HOST = os.environ.get("HUB_HOST") or "localhost"
HUB_PORT = try_parse_int(os.environ.get("HUB_PORT")) or 12000
HUB_PORT = try_parse_int(os.environ.get("HUB_PORT")) or 8000
HUB_URL = f"http://{HUB_HOST}:{HUB_PORT}"
# For choosing type of connection
HUB_CONNECTION_TYPE = os.environ.get("HUB_CONNECTION_TYPE") or "mqtt"

View File

@@ -1,50 +0,0 @@
version: "3.9"
# name: "road_vision"
services:
mqtt:
image: eclipse-mosquitto
container_name: mqtt
volumes:
- ./mosquitto:/mosquitto
- ./mosquitto/data:/mosquitto/data
- ./mosquitto/log:/mosquitto/log
ports:
- 1883:1883
- 19001:9001
networks:
mqtt_network:
edge:
container_name: edge
build:
context: ../../
dockerfile: edge/Dockerfile
depends_on:
- mqtt
environment:
MQTT_BROKER_HOST: "mqtt"
MQTT_BROKER_PORT: 1883
MQTT_TOPIC: " "
HUB_HOST: "store"
HUB_PORT: 8000
HUB_MQTT_BROKER_HOST: "mqtt"
HUB_MQTT_BROKER_PORT: 1883
HUB_MQTT_TOPIC: "processed_data_topic"
networks:
mqtt_network:
edge_hub:
networks:
mqtt_network:
db_network:
edge_hub:
hub:
hub_store:
hub_redis:
volumes:
postgres_data:
pgadmin-data:

View File

@@ -10,42 +10,51 @@ from config import (
HUB_MQTT_BROKER_HOST,
HUB_MQTT_BROKER_PORT,
HUB_MQTT_TOPIC,
HUB_CONNECTION_TYPE,
)
if __name__ == "__main__":
# Configure logging settings
logging.basicConfig(
level=logging.INFO, # Set the log level to INFO (you can use logging.DEBUG for more detailed logs)
level=logging.INFO,
format="[%(asctime)s] [%(levelname)s] [%(module)s] %(message)s",
handlers=[
logging.StreamHandler(), # Output log messages to the console
logging.FileHandler("app.log"), # Save log messages to a file
logging.StreamHandler(),
logging.FileHandler("app.log"),
],
)
# Create an instance of the StoreApiAdapter using the configuration
# hub_adapter = HubHttpAdapter(
# api_base_url=HUB_URL,
# )
hub_adapter = HubMqttAdapter(
broker=HUB_MQTT_BROKER_HOST,
port=HUB_MQTT_BROKER_PORT,
topic=HUB_MQTT_TOPIC,
)
# Create an instance of the AgentMQTTAdapter using the configuration
# Logic to select the adapter based on configuration (SCRUM-93 & SCRUM-94)
# This allows easy switching between HTTP and MQTT protocols
if HUB_CONNECTION_TYPE.lower() == "http":
logging.info("Initializing HubHttpAdapter (SCRUM-93 integration)")
hub_adapter = HubHttpAdapter(
api_base_url=HUB_URL,
)
else:
logging.info("Initializing HubMqttAdapter (SCRUM-94 integration)")
hub_adapter = HubMqttAdapter(
broker=HUB_MQTT_BROKER_HOST,
port=HUB_MQTT_BROKER_PORT,
topic=HUB_MQTT_TOPIC,
)
# Create an instance of the AgentMQTTAdapter using the selected hub adapter
# This adapter acts as a bridge between the Agent and the Hub
agent_adapter = AgentMQTTAdapter(
broker_host=MQTT_BROKER_HOST,
broker_port=MQTT_BROKER_PORT,
topic=MQTT_TOPIC,
hub_gateway=hub_adapter,
)
try:
# Connect to the MQTT broker and start listening for messages
logging.info(f"Connecting to MQTT broker at {MQTT_BROKER_HOST}:{MQTT_BROKER_PORT}")
agent_adapter.connect()
agent_adapter.start()
# Keep the system running indefinitely (you can add other logic as needed)
while True:
pass
logging.info("Broker connection success. Waiting for data...")
agent_adapter.loop_forever()
except KeyboardInterrupt:
# Stop the MQTT adapter and exit gracefully if interrupted by the user
agent_adapter.stop()
logging.info("System stopped.")
logging.info("Interrupt signal received. Shutting down...")
agent_adapter.disconnect()
logging.info("Disconnected from MQTT broker.")

12
hub/Dockerfile-test Normal file
View File

@@ -0,0 +1,12 @@
# Use the official Python image as the base image
FROM python:3.9-slim
# Set the working directory inside the container
WORKDIR /app
# Copy the requirements.txt file and install dependencies
COPY hub/requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy the entire application into the container
COPY hub/. .
# Run the main.py script inside the container when it starts
CMD ["./test-entry.sh"]

View File

@@ -13,12 +13,39 @@ class StoreApiAdapter(StoreGateway):
def __init__(self, api_base_url):
self.api_base_url = api_base_url
def processed_agent_data_batch_to_payload(self, processed_agent_data_batch: List[ProcessedAgentData]):
if not processed_agent_data_batch:
return False
# Extract user_id from the first element
user_id = processed_agent_data_batch[0].agent_data.user_id
payload = {
"data": [item.model_dump(mode='json') for item in processed_agent_data_batch],
"user_id": user_id
}
return payload
def save_data(self, processed_agent_data_batch: List[ProcessedAgentData]):
"""
Save the processed road data to the Store API.
Parameters:
processed_agent_data_batch (dict): Processed road data to be saved.
Returns:
bool: True if the data is successfully saved, False otherwise.
"""
# Implement it
payload = self.processed_agent_data_batch_to_payload(processed_agent_data_batch)
if payload == False:
return False
try:
# Perform a POST request to the Store API with a 10-second timeout
response = requests.post(
f"{self.api_base_url}/processed_agent_data/",
json=payload,
timeout=10
)
if response.status_code == 200:
logging.info(f"Batch of {len(processed_agent_data_batch)} items sent to Store.")
return True
else:
logging.error(f"Store API error: {response.status_code} - {response.text}")
return False
except Exception as e:
logging.error(f"Failed to send data to Store: {e}")
return False

View File

@@ -0,0 +1,41 @@
from app.adapters.store_api_adapter import StoreApiAdapter
from app.entities.agent_data import AccelerometerData, AgentData, GpsData
from app.entities.processed_agent_data import ProcessedAgentData
def _test_processed_agent_data_batch_to_payload():
processed_data_batch = [
ProcessedAgentData(road_state = "normal",
agent_data = AgentData(user_id = 1,
accelerometer = AccelerometerData(x = 0.1, y = 0.2, z = 0.3),
gps = GpsData(latitude = 10.123, longitude = 20.456),
timestamp = "2023-07-21T12:34:56Z")
),
ProcessedAgentData(road_state = "normal",
agent_data = AgentData(user_id = 2,
accelerometer = AccelerometerData(x = 0.1, y = 0.2, z = 0.3),
gps = GpsData(latitude = 10.123, longitude = 20.456),
timestamp = "2023-07-21T12:34:56Z")
),
ProcessedAgentData(road_state = "normal",
agent_data = AgentData(user_id = 3,
accelerometer = AccelerometerData(x = 0.1, y = 0.2, z = 0.3),
gps = GpsData(latitude = 10.123, longitude = 20.456),
timestamp = "2023-07-21T12:34:56Z")
),
]
res = StoreApiAdapter(None).processed_agent_data_batch_to_payload(processed_data_batch)
assert res["data"][0]["agent_data"]["user_id"] == 1
assert res["data"][1]["agent_data"]["user_id"] == 2
assert res["data"][2]["agent_data"]["user_id"] == 3
assert StoreApiAdapter(None).processed_agent_data_batch_to_payload([]) == False
if __name__ == "__main__":
test_functions = [i for i in dir() if i.startswith('_test_')]
for i in test_functions:
print(i)
eval(i)()

View File

@@ -70,18 +70,20 @@ def on_message(client, userdata, msg):
processed_agent_data = ProcessedAgentData.model_validate_json(
payload, strict=True
)
redis_client.lpush(
"processed_agent_data", processed_agent_data.model_dump_json()
)
processed_agent_data_batch: List[ProcessedAgentData] = []
if redis_client.llen("processed_agent_data") >= BATCH_SIZE:
processed_agent_data_batch: List[ProcessedAgentData] = []
for _ in range(BATCH_SIZE):
processed_agent_data = ProcessedAgentData.model_validate_json(
redis_client.lpop("processed_agent_data")
)
processed_agent_data_batch.append(processed_agent_data)
store_adapter.save_data(processed_agent_data_batch=processed_agent_data_batch)
raw_data = redis_client.lpop("processed_agent_data")
if raw_data:
data_item = ProcessedAgentData.model_validate_json(raw_data)
processed_agent_data_batch.append(data_item)
store_adapter.save_data(processed_agent_data_batch=processed_agent_data_batch)
return {"status": "ok"}
except Exception as e:
logging.info(f"Error processing MQTT message: {e}")

3
hub/test-entry.sh Executable file
View File

@@ -0,0 +1,3 @@
#!/bin/sh
PYTHONPATH=$PWD python3 app/adapters/store_api_adapter_test.py

13
store/Dockerfile-test Normal file
View File

@@ -0,0 +1,13 @@
# Use the official Python image as the base image
FROM python:latest
# Set the working directory inside the container
WORKDIR /app
# Copy the requirements.txt file and install dependencies
COPY store/requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy the entire application into the container
COPY store/. .
# Run the main.py script inside the container when it starts
#CMD ["uvicorn", "main:app", "--host", "0.0.0.0"]
CMD ["./test-entry.sh"]

View File

@@ -1,61 +0,0 @@
name: "road_vision__database"
services:
postgres_db:
image: postgres:17
container_name: postgres_db
restart: always
environment:
POSTGRES_USER: user
POSTGRES_PASSWORD: pass
POSTGRES_DB: test_db
volumes:
- postgres_data:/var/lib/postgresql/data
- ./db/structure.sql:/docker-entrypoint-initdb.d/structure.sql
ports:
- "5432:5432"
networks:
db_network:
pgadmin:
container_name: pgadmin4
image: dpage/pgadmin4
restart: always
environment:
PGADMIN_DEFAULT_EMAIL: admin@admin.com
PGADMIN_DEFAULT_PASSWORD: root
volumes:
- pgadmin-data:/var/lib/pgadmin
ports:
- "5050:80"
networks:
db_network:
store:
container_name: store
build:
context: ../../
dockerfile: store/Dockerfile
depends_on:
- postgres_db
restart: always
environment:
POSTGRES_USER: user
POSTGRES_PASSWORD: pass
POSTGRES_DB: test_db
POSTGRES_HOST: postgres_db
POSTGRES_PORT: 5432
ports:
- "8000:8000"
networks:
db_network:
networks:
db_network:
volumes:
postgres_data:
pgadmin-data:

View File

@@ -33,41 +33,48 @@ processed_agent_data = Table(
)
# WebSocket subscriptions
subscriptions: Dict[int, Set[WebSocket]] = {}
subscriptions: Set[WebSocket] = set()
# FastAPI WebSocket endpoint
@app.websocket("/ws/{user_id}")
async def websocket_endpoint(websocket: WebSocket, user_id: int):
await websocket.accept()
if user_id not in subscriptions:
subscriptions[user_id] = set()
subscriptions[user_id].add(websocket)
subscriptions.add(websocket)
try:
# send already available data
r = processed_agent_data.select()
stored_data = SessionLocal().execute(r).fetchall()
jsonable_data = [{c.name: getattr(i, c.name) for c in processed_agent_data.columns} for i in stored_data]
for i in jsonable_data:
i['timestamp'] = i['timestamp'].strftime("%Y-%m-%dT%H:%M:%SZ")
for i in jsonable_data:
await websocket.send_json(json.dumps([i]))
# receive forever
while True:
await websocket.receive_text()
except WebSocketDisconnect:
subscriptions[user_id].remove(websocket)
subscriptions.remove(websocket)
# Function to send data to subscribed users
async def send_data_to_subscribers(user_id: int, data):
if user_id in subscriptions:
for websocket in subscriptions[user_id]:
await websocket.send_json(json.dumps(data))
async def send_data_to_subscribers(data):
for websocket in subscriptions:
await websocket.send_json(json.dumps([data]))
# FastAPI CRUDL endpoints
@app.post("/processed_agent_data/")
async def create_processed_agent_data(data: List[ProcessedAgentData], user_id: int = Body(..., embed=True)):
session = SessionLocal()
try:
created_data = [
def ProcessedAgentData_to_td(data: List[ProcessedAgentData]):
return [
{
"road_state": item.road_state,
"user_id": user_id,
"user_id": item.agent_data.user_id,
"x": item.agent_data.accelerometer.x,
"y": item.agent_data.accelerometer.y,
"z": item.agent_data.accelerometer.z,
@@ -77,13 +84,20 @@ async def create_processed_agent_data(data: List[ProcessedAgentData], user_id: i
}
for item in data
]
@app.post("/processed_agent_data/")
async def create_processed_agent_data(data: List[ProcessedAgentData], user_id: int = Body(..., embed=True)):
session = SessionLocal()
try:
created_data = ProcessedAgentData_to_td(data)
stmt = processed_agent_data.insert().values(created_data).returning(processed_agent_data)
result = session.execute(stmt)
created_records = [dict(row._mapping) for row in result.fetchall()]
session.commit()
for record in created_records:
await send_data_to_subscribers(user_id, jsonable_encoder(record))
await send_data_to_subscribers(jsonable_encoder(record))
return created_records
except Exception as err:
session.rollback()

3
store/test-entry.sh Executable file
View File

@@ -0,0 +1,3 @@
#!/bin/sh
PYTHONPATH=$PWD python3 test/main_test.py

39
store/test/main_test.py Normal file
View File

@@ -0,0 +1,39 @@
from schemas import AccelerometerData, AgentData, GpsData, ProcessedAgentData
import main
def _test_ProcessedAgentData_to_td():
processed_data_batch = [
ProcessedAgentData(road_state = "normal",
agent_data = AgentData(user_id = 1,
accelerometer = AccelerometerData(x = 0.1, y = 0.2, z = 0.3),
gps = GpsData(latitude = 10.123, longitude = 20.456),
timestamp = "2023-07-21T12:34:56Z")
),
ProcessedAgentData(road_state = "normal",
agent_data = AgentData(user_id = 2,
accelerometer = AccelerometerData(x = 0.1, y = 0.2, z = 0.3),
gps = GpsData(latitude = 10.123, longitude = 20.456),
timestamp = "2023-07-21T12:34:56Z")
),
ProcessedAgentData(road_state = "normal",
agent_data = AgentData(user_id = 3,
accelerometer = AccelerometerData(x = 0.1, y = 0.2, z = 0.3),
gps = GpsData(latitude = 10.123, longitude = 20.456),
timestamp = "2023-07-21T12:34:56Z")
),
]
res = main.ProcessedAgentData_to_td(processed_data_batch)
assert res[0]["user_id"] == 1
assert res[1]["user_id"] == 2
assert res[2]["user_id"] == 3
if __name__ == "__main__":
test_functions = [i for i in dir() if i.startswith('_test_')]
for i in test_functions:
print(i)
eval(i)()

19
utils/check-up.py Normal file
View File

@@ -0,0 +1,19 @@
import sys
print("Checking for dead containers...")
l = [i for i in sys.stdin.read().split("\n") if i]
header, statuses = l[0], l[1:]
status_index = header.find('STATUS')
name_index = header.find('NAMES')
exit_code = 0
for i in statuses:
if not i[status_index:].startswith("Up "):
service_name = i[name_index:]
print(f"Crash detected in {service_name}")
exit_code = 1
sys.exit(exit_code)