24 Commits

Author SHA1 Message Date
hasslesstech 564fe329f3 Merge remote-tracking branch 'github/lab1_huranets' into lab1_huranets
Test Agent / test-agent-run (push) Successful in 5m49s
2026-02-24 21:00:15 +02:00
bacant150 75613fd4fc Restore input validation for accelerometer parsing 2026-02-24 20:47:00 +02:00
bacant150 a25fbfc3ef Simplify accelerometer parsing and remove int16-specific leftovers 2026-02-24 20:14:56 +02:00
bacant150 ca790e7306 Remove int16 binding from datasource 2026-02-24 19:55:50 +02:00
bacant150 1643767094 Remove int16 range check per review 2026-02-24 15:41:41 +02:00
bacant150 3d94bf3008 Parse accelerometer values as int16 (remove float conversion) 2026-02-24 15:12:49 +02:00
hasslesstech 29196ba400 test
Test Agent / test-agent-run (push) Successful in 5m47s
2026-02-24 14:55:54 +02:00
bacant150 72e9f65b27 Add mosquitto runtime folders to gitignore
Test Agent / test-agent-run (push) Successful in 5m23s
2026-02-24 14:47:21 +02:00
bacant150 2b6bed70d8 Fix timestamp field in AggregatedData 2026-02-24 14:47:21 +02:00
bacant150 21ee14ccd1 Реалізовую базовий FileReader та переношу sleep до FileDatasource.read() 2026-02-24 14:47:17 +02:00
bacant150 c5d98d53cd Add mosquitto runtime folders to gitignore 2026-02-24 14:25:27 +02:00
bacant150 07a0e906d8 Fix timestamp field in AggregatedData 2026-02-24 14:21:41 +02:00
hasslesstech b330180909 lab1: add CI/CD testing
Test Agent / test-agent-run (push) Successful in 5m24s
2026-02-24 10:16:28 +02:00
bacant150 c974ac32f6 Реалізовую базовий FileReader та переношу sleep до FileDatasource.read() 2026-02-23 22:01:11 +02:00
VladiusVostokus b2c7427af0 Merge pull request #1 from Rhinemann/lab1_shved
updated compose file
2026-02-23 16:10:12 +00:00
Rhinemann e45faeb281 updated compose file 2026-02-22 11:22:19 +01:00
diana-tym b8db2fe6ee refactor: change task 2024-03-13 15:14:48 +02:00
Toolf 0c5954f96c add: mapview datasource implementation 2024-02-26 15:16:47 +02:00
Toolf 3a4fa51aa9 fix: naming 2024-02-26 14:54:25 +02:00
Toolf e32ba94adc add: MapView template 2024-02-26 14:53:24 +02:00
Toolf 93cc8d7378 fix: aggregated_data model 2024-02-16 15:49:19 +02:00
Toolf b65670861d add: edge docker-compose.yaml 2024-02-15 16:04:22 +02:00
Toolf 9a179e09e9 add: edge template 2024-02-12 18:21:08 +02:00
Toolf 173a61d117 add: hub template 2024-02-12 18:18:38 +02:00
50 changed files with 1658 additions and 26 deletions
+25
View File
@@ -0,0 +1,25 @@
name: Test Agent
on: [push, workflow_dispatch]
jobs:
test-agent-run:
runs-on: arch-x86_64
steps:
- name: Fetch the repository
run: git clone --branch ${{ gitea.ref_name }} --depth 1 ${{ gitea.server_url }}/${{ gitea.repository }}
- name: Build containers
run: docker-compose -f docker-compose-test.yaml build
working-directory: sem8-iot-test/agent/docker
- name: Start MQTT broker
run: docker-compose -f docker-compose-test.yaml up -d mqtt
working-directory: sem8-iot-test/agent/docker
- name: Start agent
run: docker-compose -f docker-compose-test.yaml run fake_agent
working-directory: sem8-iot-test/agent/docker
- name: Clean up
if: always()
run: docker-compose -f docker-compose-test.yaml down
working-directory: sem8-iot-test/agent/docker
+2
View File
@@ -0,0 +1,2 @@
agent/docker/mosquitto/data/
agent/docker/mosquitto/log/
+3
View File
@@ -0,0 +1,3 @@
.idea
venv
__pycache__
+102
View File
@@ -0,0 +1,102 @@
## Лабораторна робота №5
### Тема
Візуалізація якості стану дорожнього покриття за допомогою фреймворку Kivy.
### Мета
Розробити програму для візуалізації руху машини на дорозі та якості дороги за допомогою даних датчиків.
### Підготовка робочого середовище, встановлення проекту
Створення віртуального середовища
`python -m venv ./venv`
Активація середовища для linux
`source ./venv/bin/activate`
Активація середовища для windows
`venv\Scripts\activate`
Встановлення необхідних бібліотек
`pip install -r requirements.txt`
### Завдання
Для відображення мапи використовується віджет [Mapview](https://mapview.readthedocs.io/en/1.0.4/) для Kivy.
Для візуалізації руху машини можна використовувати MapMarker та рухати його відповідно GPS даних.
Для позначення нерівностей на дорозі також використовувати маркери. Зображення для маркерів можна знайти в папці images.
Для створення та редагування маршруту машини на мапі використовуйте клас LineMapLayer та функцію
`add_point()` з файлу lineMapLayer.py. Додавання лінії на мапу:
```python
map_layer = LineMapLayer()
mapview.add_layer(map_layer, mode="scatter")
```
Щоб створити затримку при відображенні руху машини можна використовувати
функцію `kivy.clock.Clock.schedule_once()` або `kivy.clock.Clock.schedule_interval()`.
Дані для відображення на мапі (координати та стан дороги) зчитуються з бази даних через вебсокет.
Для їх отримання використовуйте функцію `get_new_points()` з datasource.py.
**Шаблон основного файлу проєкту**
```python
from kivy.app import App
from kivy_garden.mapview import MapMarker, MapView
from kivy.clock import Clock
from lineMapLayer import LineMapLayer
class MapViewApp(App):
def __init__(self, **kwargs):
super().__init__()
# додати необхідні змінні
def on_start(self):
"""
Встановлює необхідні маркери, викликає функцію для оновлення мапи
"""
def update(self, *args):
"""
Викликається регулярно для оновлення мапи
"""
def update_car_marker(self, point):
"""
Оновлює відображення маркера машини на мапі
:param point: GPS координати
"""
def set_pothole_marker(self, point):
"""
Встановлює маркер для ями
:param point: GPS координати
"""
def set_bump_marker(self, point):
"""
Встановлює маркер для лежачого поліцейського
:param point: GPS координати
"""
def build(self):
"""
Ініціалізує мапу MapView(zoom, lat, lon)
:return: мапу
"""
self.mapview = MapView()
return self.mapview
if __name__ == '__main__':
MapViewApp().run()
```
+4
View File
@@ -0,0 +1,4 @@
import os
STORE_HOST = os.environ.get("STORE_HOST") or "localhost"
STORE_PORT = os.environ.get("STORE_PORT") or 8000
+1
View File
File diff suppressed because one or more lines are too long
+81
View File
@@ -0,0 +1,81 @@
import asyncio
import json
from datetime import datetime
import websockets
from kivy import Logger
from pydantic import BaseModel, field_validator
from config import STORE_HOST, STORE_PORT
# Pydantic models
class ProcessedAgentData(BaseModel):
road_state: str
user_id: int
x: float
y: float
z: float
latitude: float
longitude: float
timestamp: datetime
@classmethod
@field_validator("timestamp", mode="before")
def check_timestamp(cls, value):
if isinstance(value, datetime):
return value
try:
return datetime.fromisoformat(value)
except (TypeError, ValueError):
raise ValueError(
"Invalid timestamp format. Expected ISO 8601 format (YYYY-MM-DDTHH:MM:SSZ)."
)
class Datasource:
def __init__(self, user_id: int):
self.index = 0
self.user_id = user_id
self.connection_status = None
self._new_points = []
asyncio.ensure_future(self.connect_to_server())
def get_new_points(self):
Logger.debug(self._new_points)
points = self._new_points
self._new_points = []
return points
async def connect_to_server(self):
uri = f"ws://{STORE_HOST}:{STORE_PORT}/ws/{self.user_id}"
while True:
Logger.debug("CONNECT TO SERVER")
async with websockets.connect(uri) as websocket:
self.connection_status = "Connected"
try:
while True:
data = await websocket.recv()
parsed_data = json.loads(data)
self.handle_received_data(parsed_data)
except websockets.ConnectionClosedOK:
self.connection_status = "Disconnected"
Logger.debug("SERVER DISCONNECT")
def handle_received_data(self, data):
# Update your UI or perform actions with received data here
Logger.debug(f"Received data: {data}")
processed_agent_data_list = sorted(
[
ProcessedAgentData(**processed_data_json)
for processed_data_json in json.loads(data)
],
key=lambda v: v.timestamp,
)
new_points = [
(
processed_agent_data.latitude,
processed_agent_data.longitude,
processed_agent_data.road_state,
)
for processed_agent_data in processed_agent_data_list
]
self._new_points.extend(new_points)
Binary file not shown.

After

Width:  |  Height:  |  Size: 2.4 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 4.0 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 4.3 KiB

+147
View File
@@ -0,0 +1,147 @@
from kivy_garden.mapview import MapLayer, MapMarker
from kivy.graphics import Color, Line
from kivy.graphics.context_instructions import Translate, Scale, PushMatrix, PopMatrix
from kivy_garden.mapview.utils import clamp
from kivy_garden.mapview.constants import (MIN_LONGITUDE, MAX_LONGITUDE, MIN_LATITUDE, MAX_LATITUDE)
from math import radians, log, tan, cos, pi
class LineMapLayer(MapLayer):
def __init__(self, coordinates=None, color=[0, 0, 1, 1], width=2, **kwargs):
super().__init__(**kwargs)
# if coordinates is None:
# coordinates = [[0, 0], [0, 0]]
self._coordinates = coordinates
self.color = color
self._line_points = None
self._line_points_offset = (0, 0)
self.zoom = 0
self.lon = 0
self.lat = 0
self.ms = 0
self._width = width
@property
def coordinates(self):
return self._coordinates
@coordinates.setter
def coordinates(self, coordinates):
self._coordinates = coordinates
self.invalidate_line_points()
self.clear_and_redraw()
def add_point(self, point):
if self._coordinates is None:
#self._coordinates = [point]
self._coordinates = []
self._coordinates.append(point)
# self._coordinates = [self._coordinates[-1], point]
self.invalidate_line_points()
self.clear_and_redraw()
@property
def line_points(self):
if self._line_points is None:
self.calc_line_points()
return self._line_points
@property
def line_points_offset(self):
if self._line_points is None:
self.calc_line_points()
return self._line_points_offset
def calc_line_points(self):
# Offset all points by the coordinates of the first point,
# to keep coordinates closer to zero.
# (and therefore avoid some float precision issues when drawing lines)
self._line_points_offset = (self.get_x(self.coordinates[0][1]),
self.get_y(self.coordinates[0][0]))
# Since lat is not a linear transform we must compute manually
self._line_points = [(self.get_x(lon) - self._line_points_offset[0],
self.get_y(lat) - self._line_points_offset[1])
for lat, lon in self.coordinates]
def invalidate_line_points(self):
self._line_points = None
self._line_points_offset = (0, 0)
def get_x(self, lon):
"""Get the x position on the map using this map source's projection
(0, 0) is located at the top left.
"""
return clamp(lon, MIN_LONGITUDE, MAX_LONGITUDE) * self.ms / 360.0
def get_y(self, lat):
"""Get the y position on the map using this map source's projection
(0, 0) is located at the top left.
"""
lat = radians(clamp(-lat, MIN_LATITUDE, MAX_LATITUDE))
return (1.0 - log(tan(lat) + 1.0 / cos(lat)) / pi) * self.ms / 2.0
# Function called when the MapView is moved
def reposition(self):
map_view = self.parent
# Must redraw when the zoom changes
# as the scatter transform resets for the new tiles
if self.zoom != map_view.zoom or \
self.lon != round(map_view.lon, 7) or \
self.lat != round(map_view.lat, 7):
map_source = map_view.map_source
self.ms = pow(2.0, map_view.zoom) * map_source.dp_tile_size
self.invalidate_line_points()
self.clear_and_redraw()
def clear_and_redraw(self, *args):
with self.canvas:
# Clear old line
self.canvas.clear()
self._draw_line()
def _draw_line(self, *args):
if self._coordinates is None:
return
map_view = self.parent
self.zoom = map_view.zoom
self.lon = map_view.lon
self.lat = map_view.lat
# When zooming we must undo the current scatter transform
# or the animation distorts it
scatter = map_view._scatter
sx, sy, ss = scatter.x, scatter.y, scatter.scale
# Account for map source tile size and map view zoom
vx, vy, vs = map_view.viewport_pos[0], map_view.viewport_pos[1], map_view.scale
with self.canvas:
self.opacity = 0.5
# Save the current coordinate space context
PushMatrix()
# Offset by the MapView's position in the window (always 0,0 ?)
Translate(*map_view.pos)
# Undo the scatter animation transform
Scale(1 / ss, 1 / ss, 1)
Translate(-sx, -sy)
# Apply the get window xy from transforms
Scale(vs, vs, 1)
Translate(-vx, -vy)
# Apply what we can factor out of the mapsource long, lat to x, y conversion
Translate(self.ms / 2, 0)
# Translate by the offset of the line points
# (this keeps the points closer to the origin)
Translate(*self.line_points_offset)
Color(*self.color)
Line(points=self.line_points, width=self._width)
# Retrieve the last saved coordinate space context
PopMatrix()
+55
View File
@@ -0,0 +1,55 @@
import asyncio
from kivy.app import App
from kivy_garden.mapview import MapMarker, MapView
from kivy.clock import Clock
from lineMapLayer import LineMapLayer
from datasource import Datasource
class MapViewApp(App):
def __init__(self, **kwargs):
super().__init__()
# додати необхідні змінні
def on_start(self):
"""
Встановлює необхідні маркери, викликає функцію для оновлення мапи
"""
def update(self, *args):
"""
Викликається регулярно для оновлення мапи
"""
def update_car_marker(self, point):
"""
Оновлює відображення маркера машини на мапі
:param point: GPS координати
"""
def set_pothole_marker(self, point):
"""
Встановлює маркер для ями
:param point: GPS координати
"""
def set_bump_marker(self, point):
"""
Встановлює маркер для лежачого поліцейського
:param point: GPS координати
"""
def build(self):
"""
Ініціалізує мапу MapView(zoom, lat, lon)
:return: мапу
"""
self.mapview = MapView()
return self.mapview
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(MapViewApp().async_run(async_lib="asyncio"))
loop.close()
+15
View File
@@ -0,0 +1,15 @@
annotated-types==0.6.0
certifi==2024.2.2
charset-normalizer==3.3.2
docutils==0.20.1
idna==3.6
Kivy==2.3.0
Kivy-Garden==0.1.5
kivy-garden.mapview==1.0.6
pydantic==2.6.2
pydantic_core==2.16.3
Pygments==2.17.2
requests==2.31.0
typing_extensions==4.10.0
urllib3==2.2.1
websockets==12.0
+34
View File
@@ -0,0 +1,34 @@
version: "3.3"
#name: "road_vision"
services:
mqtt:
image: eclipse-mosquitto
container_name: mqtt
volumes:
- ./mosquitto:/mosquitto
- ./mosquitto/data:/mosquitto/data
- ./mosquitto/log:/mosquitto/log
ports:
- 1883:1883
- 9001:9001
networks:
mqtt_network:
fake_agent:
container_name: agent
build: ../
depends_on:
- mqtt
environment:
MQTT_BROKER_HOST: "mqtt"
MQTT_BROKER_PORT: 1883
MQTT_TOPIC: "agent_data_topic"
DELAY: 0.1
MAX_SENDS: 30
networks:
mqtt_network:
networks:
mqtt_network:
+2 -2
View File
@@ -1,5 +1,5 @@
version: "3.9" version: "3.3"
name: "road_vision" #name: "road_vision"
services: services:
mqtt: mqtt:
image: eclipse-mosquitto image: eclipse-mosquitto
+4
View File
@@ -8,6 +8,7 @@ def try_parse(type, value: str):
return None return None
USER_ID = 1
# MQTT config # MQTT config
MQTT_BROKER_HOST = os.environ.get("MQTT_BROKER_HOST") or "mqtt" MQTT_BROKER_HOST = os.environ.get("MQTT_BROKER_HOST") or "mqtt"
MQTT_BROKER_PORT = try_parse(int, os.environ.get("MQTT_BROKER_PORT")) or 1883 MQTT_BROKER_PORT = try_parse(int, os.environ.get("MQTT_BROKER_PORT")) or 1883
@@ -15,3 +16,6 @@ MQTT_TOPIC = os.environ.get("MQTT_TOPIC") or "agent"
# Delay for sending data to mqtt in seconds # Delay for sending data to mqtt in seconds
DELAY = try_parse(float, os.environ.get("DELAY")) or 1 DELAY = try_parse(float, os.environ.get("DELAY")) or 1
# Testing switches for CI/CD
MAX_SENDS = try_parse(int, os.environ.get("MAX_SENDS"))
+2 -1
View File
@@ -9,4 +9,5 @@ from domain.gps import Gps
class AggregatedData: class AggregatedData:
accelerometer: Accelerometer accelerometer: Accelerometer
gps: Gps gps: Gps
time: datetime timestamp: datetime
user_id: int
+222 -11
View File
@@ -1,21 +1,232 @@
from csv import reader from __future__ import annotations
import csv
import time
from datetime import datetime from datetime import datetime
from pathlib import Path
from typing import Optional, List
from domain.accelerometer import Accelerometer
from domain.gps import Gps
from domain.aggregated_data import AggregatedData from domain.aggregated_data import AggregatedData
import config
class FileDatasource: class FileDatasource:
def __init__(
self,
accelerometer_filename: str,
gps_filename: str,
) -> None:
pass
def read(self) -> AggregatedData: def __init__(self, accelerometer_filename: str, gps_filename: str) -> None:
"""Метод повертає дані отримані з датчиків""" self.accelerometer_filename = accelerometer_filename
self.gps_filename = gps_filename
self._acc_f = None
self._gps_f = None
self._acc_reader: Optional[csv.reader] = None
self._gps_reader: Optional[csv.reader] = None
self._started = False
# one-row buffers (supports CSVs with or without header)
self._acc_buf: Optional[List[str]] = None
self._gps_buf: Optional[List[str]] = None
self._acc_has_header: Optional[bool] = None
self._gps_has_header: Optional[bool] = None
def startReading(self, *args, **kwargs): def startReading(self, *args, **kwargs):
"""Метод повинен викликатись перед початком читання даних""" """Must be called before read()"""
if self._started:
return
if not Path(self.accelerometer_filename).exists():
raise FileNotFoundError(f"Accelerometer file not found: {self.accelerometer_filename}")
if not Path(self.gps_filename).exists():
raise FileNotFoundError(f"GPS file not found: {self.gps_filename}")
self._open_files()
self._started = True
def stopReading(self, *args, **kwargs): def stopReading(self, *args, **kwargs):
"""Метод повинен викликатись для закінчення читання даних""" """Must be called when finishing reading"""
self._close_files()
self._started = False
def read(self) -> AggregatedData:
"""Return one combined sample (acc + gps)."""
if not self._started:
raise RuntimeError("Datasource is not started. Call startReading() before read().")
acc_row = self._next_acc_row()
gps_row = self._next_gps_row()
acc = self._parse_acc(acc_row)
gps = self._parse_gps(gps_row)
# IMPORTANT: timing belongs to datasource (not MQTT / main.py)
if config.DELAY and config.DELAY > 0:
time.sleep(float(config.DELAY))
return AggregatedData(
accelerometer=acc,
gps=gps,
timestamp=datetime.utcnow(),
user_id=config.USER_ID,
)
# ---------------- internal ----------------
def _open_files(self) -> None:
self._close_files()
self._acc_f = open(self.accelerometer_filename, "r", newline="", encoding="utf-8")
self._gps_f = open(self.gps_filename, "r", newline="", encoding="utf-8")
self._acc_reader = csv.reader(self._acc_f)
self._gps_reader = csv.reader(self._gps_f)
self._acc_buf = None
self._gps_buf = None
self._acc_has_header, self._acc_buf = self._detect_header_and_buffer(
self._acc_reader, expected_cols=3, header_tokens=("x", "y", "z")
)
self._gps_has_header, self._gps_buf = self._detect_header_and_buffer(
self._gps_reader, expected_cols=2, header_tokens=("longitude", "latitude")
)
def _close_files(self) -> None:
for f in (self._acc_f, self._gps_f):
try:
if f is not None:
f.close()
except Exception:
pass
self._acc_f = None
self._gps_f = None
self._acc_reader = None
self._gps_reader = None
self._acc_buf = None
self._gps_buf = None
self._acc_has_header = None
self._gps_has_header = None
def _rewind_acc(self) -> None:
if self._acc_f is None:
raise RuntimeError("Accelerometer file is not open.")
self._acc_f.seek(0)
self._acc_reader = csv.reader(self._acc_f)
self._acc_has_header, self._acc_buf = self._detect_header_and_buffer(
self._acc_reader, expected_cols=3, header_tokens=("x", "y", "z")
)
def _rewind_gps(self) -> None:
if self._gps_f is None:
raise RuntimeError("GPS file is not open.")
self._gps_f.seek(0)
self._gps_reader = csv.reader(self._gps_f)
self._gps_has_header, self._gps_buf = self._detect_header_and_buffer(
self._gps_reader, expected_cols=2, header_tokens=("longitude", "latitude")
)
def _next_acc_row(self) -> List[str]:
if self._acc_reader is None:
raise RuntimeError("Accelerometer reader is not initialized.")
while True:
if self._acc_buf is not None:
row = self._acc_buf
self._acc_buf = None
else:
row = next(self._acc_reader, None)
if row is None:
# EOF -> rewind & continue
self._rewind_acc()
continue
row = [c.strip() for c in row]
if not row or not any(row):
continue
return row
def _next_gps_row(self) -> List[str]:
if self._gps_reader is None:
raise RuntimeError("GPS reader is not initialized.")
while True:
if self._gps_buf is not None:
row = self._gps_buf
self._gps_buf = None
else:
row = next(self._gps_reader, None)
if row is None:
# EOF -> rewind & continue
self._rewind_gps()
continue
row = [c.strip() for c in row]
if not row or not any(row):
continue
return row
@staticmethod
def _detect_header_and_buffer(
rdr: csv.reader, expected_cols: int, header_tokens: tuple[str, ...]
) -> tuple[bool, Optional[List[str]]]:
first = None
while True:
first = next(rdr, None)
if first is None:
return False, None
first = [c.strip() for c in first]
if first and any(first):
break
norm = [c.lower() for c in first]
# Header if it contains the expected tokens
if all(tok in norm for tok in header_tokens):
return True, None
# If first row is numeric-like and has enough columns => it's data (buffer it back)
if len(norm) >= expected_cols and all(FileDatasource._is_number(x) for x in norm[:expected_cols]):
return False, first
# Otherwise treat it as header-ish (skip it)
return True, None
@staticmethod
def _parse_acc(row: List[str]) -> Accelerometer:
if len(row) < 3:
raise ValueError(f"Accelerometer row must have 3 values (x,y,z). Got: {row}")
try:
x = int(row[0])
y = int(row[1])
z = int(row[2])
except ValueError as e:
raise ValueError(
f"Invalid accelerometer values (expected integers): {row}"
) from e
return Accelerometer(x=x, y=y, z=z)
@staticmethod
def _parse_gps(row: List[str]) -> Gps:
if len(row) < 2:
raise ValueError(f"GPS row must have 2 values (longitude,latitude). Got: {row}")
lon = float(row[0])
lat = float(row[1])
return Gps(longitude=lon, latitude=lat)
@staticmethod
def _is_number(s: str) -> bool:
try:
float(s)
return True
except Exception:
return False
+12 -11
View File
@@ -1,6 +1,4 @@
from paho.mqtt import client as mqtt_client from paho.mqtt import client as mqtt_client
import json
import time
from schema.aggregated_data_schema import AggregatedDataSchema from schema.aggregated_data_schema import AggregatedDataSchema
from file_datasource import FileDatasource from file_datasource import FileDatasource
import config import config
@@ -24,29 +22,32 @@ def connect_mqtt(broker, port):
return client return client
def publish(client, topic, datasource, delay): def publish(client, topic, datasource, max_sends = None):
datasource.startReading() datasource.startReading()
i = 0
while True: while True:
time.sleep(delay) i += 1
data = datasource.read() data = datasource.read()
msg = AggregatedDataSchema().dumps(data) msg = AggregatedDataSchema().dumps(data)
result = client.publish(topic, msg) result = client.publish(topic, msg)
# result: [0, 1]
status = result[0] status = result[0]
if status == 0: if status != 0:
pass
# print(f"Send `{msg}` to topic `{topic}`")
else:
print(f"Failed to send message to topic {topic}") print(f"Failed to send message to topic {topic}")
if max_sends and i >= max_sends:
# display test success
exit(0)
def run(): def run():
# Prepare mqtt client # Prepare mqtt client
client = connect_mqtt(config.MQTT_BROKER_HOST, config.MQTT_BROKER_PORT) client = connect_mqtt(config.MQTT_BROKER_HOST, config.MQTT_BROKER_PORT)
# Prepare datasource # Prepare datasource
datasource = FileDatasource("data/data.csv", "data/gps_data.csv") datasource = FileDatasource("data/accelerometer.csv", "data/gps.csv")
# Infinity publish data # Infinity publish data
publish(client, config.MQTT_TOPIC, datasource, config.DELAY) publish(client, config.MQTT_TOPIC, datasource, getattr(config, "MAX_SENDS", None))
if __name__ == "__main__": if __name__ == "__main__":
+2 -1
View File
@@ -6,4 +6,5 @@ from schema.gps_schema import GpsSchema
class AggregatedDataSchema(Schema): class AggregatedDataSchema(Schema):
accelerometer = fields.Nested(AccelerometerSchema) accelerometer = fields.Nested(AccelerometerSchema)
gps = fields.Nested(GpsSchema) gps = fields.Nested(GpsSchema)
time = fields.DateTime("iso") timestamp = fields.DateTime("iso")
user_id = fields.Int()
+2
View File
@@ -0,0 +1,2 @@
venv
app.log
+11
View File
@@ -0,0 +1,11 @@
# Use the official Python image as the base image
FROM python:3.9-slim
# Set the working directory inside the container
WORKDIR /app
# Copy the requirements.txt file and install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy the entire application into the container
COPY . .
# Run the main.py script inside the container when it starts
CMD ["python", "main.py"]
+76
View File
@@ -0,0 +1,76 @@
import logging
import paho.mqtt.client as mqtt
from app.interfaces.agent_gateway import AgentGateway
from app.entities.agent_data import AgentData, GpsData
from app.usecases.data_processing import process_agent_data
from app.interfaces.hub_gateway import HubGateway
class AgentMQTTAdapter(AgentGateway):
def __init__(
self,
broker_host,
broker_port,
topic,
hub_gateway: HubGateway,
batch_size=10,
):
self.batch_size = batch_size
# MQTT
self.broker_host = broker_host
self.broker_port = broker_port
self.topic = topic
self.client = mqtt.Client()
# Hub
self.hub_gateway = hub_gateway
def on_connect(self, client, userdata, flags, rc):
if rc == 0:
logging.info("Connected to MQTT broker")
self.client.subscribe(self.topic)
else:
logging.info(f"Failed to connect to MQTT broker with code: {rc}")
def on_message(self, client, userdata, msg):
"""Processing agent data and sent it to hub gateway"""
try:
payload: str = msg.payload.decode("utf-8")
# Create AgentData instance with the received data
agent_data = AgentData.model_validate_json(payload, strict=True)
# Process the received data (you can call a use case here if needed)
processed_data = process_agent_data(agent_data)
# Store the agent_data in the database (you can send it to the data processing module)
if not self.hub_gateway.save_data(processed_data):
logging.error("Hub is not available")
except Exception as e:
logging.info(f"Error processing MQTT message: {e}")
def connect(self):
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
self.client.connect(self.broker_host, self.broker_port, 60)
def start(self):
self.client.loop_start()
def stop(self):
self.client.loop_stop()
# Usage example:
if __name__ == "__main__":
broker_host = "localhost"
broker_port = 1883
topic = "agent_data_topic"
# Assuming you have implemented the StoreGateway and passed it to the adapter
store_gateway = HubGateway()
adapter = AgentMQTTAdapter(broker_host, broker_port, topic, store_gateway)
adapter.connect()
adapter.start()
try:
# Keep the adapter running in the background
while True:
pass
except KeyboardInterrupt:
adapter.stop()
logging.info("Adapter stopped.")
+29
View File
@@ -0,0 +1,29 @@
import logging
import requests as requests
from app.entities.processed_agent_data import ProcessedAgentData
from app.interfaces.hub_gateway import HubGateway
class HubHttpAdapter(HubGateway):
def __init__(self, api_base_url):
self.api_base_url = api_base_url
def save_data(self, processed_data: ProcessedAgentData):
"""
Save the processed road data to the Hub.
Parameters:
processed_data (ProcessedAgentData): Processed road data to be saved.
Returns:
bool: True if the data is successfully saved, False otherwise.
"""
url = f"{self.api_base_url}/processed_agent_data/"
response = requests.post(url, data=processed_data.model_dump_json())
if response.status_code != 200:
logging.info(
f"Invalid Hub response\nData: {processed_data.model_dump_json()}\nResponse: {response}"
)
return False
return True
+50
View File
@@ -0,0 +1,50 @@
import logging
import requests as requests
from paho.mqtt import client as mqtt_client
from app.entities.processed_agent_data import ProcessedAgentData
from app.interfaces.hub_gateway import HubGateway
class HubMqttAdapter(HubGateway):
def __init__(self, broker, port, topic):
self.broker = broker
self.port = port
self.topic = topic
self.mqtt_client = self._connect_mqtt(broker, port)
def save_data(self, processed_data: ProcessedAgentData):
"""
Save the processed road data to the Hub.
Parameters:
processed_data (ProcessedAgentData): Processed road data to be saved.
Returns:
bool: True if the data is successfully saved, False otherwise.
"""
msg = processed_data.model_dump_json()
result = self.mqtt_client.publish(self.topic, msg)
status = result[0]
if status == 0:
return True
else:
print(f"Failed to send message to topic {self.topic}")
return False
@staticmethod
def _connect_mqtt(broker, port):
"""Create MQTT client"""
print(f"CONNECT TO {broker}:{port}")
def on_connect(client, userdata, flags, rc):
if rc == 0:
print(f"Connected to MQTT Broker ({broker}:{port})!")
else:
print("Failed to connect {broker}:{port}, return code %d\n", rc)
exit(rc) # Stop execution
client = mqtt_client.Client()
client.on_connect = on_connect
client.connect(broker, port)
client.loop_start()
return client
+32
View File
@@ -0,0 +1,32 @@
from datetime import datetime
from pydantic import BaseModel, field_validator
class AccelerometerData(BaseModel):
x: float
y: float
z: float
class GpsData(BaseModel):
latitude: float
longitude: float
class AgentData(BaseModel):
accelerometer: AccelerometerData
gps: GpsData
timestamp: datetime
@classmethod
@field_validator("timestamp", mode="before")
def parse_timestamp(cls, value):
# Convert the timestamp to a datetime object
if isinstance(value, datetime):
return value
try:
return datetime.fromisoformat(value)
except (TypeError, ValueError):
raise ValueError(
"Invalid timestamp format. Expected ISO 8601 format (YYYY-MM-DDTHH:MM:SSZ)."
)
@@ -0,0 +1,7 @@
from pydantic import BaseModel
from app.entities.agent_data import AgentData
class ProcessedAgentData(BaseModel):
road_state: str
agent_data: AgentData
+40
View File
@@ -0,0 +1,40 @@
from abc import ABC, abstractmethod
class AgentGateway(ABC):
"""
Abstract class representing the Agent Gateway interface.
All agent gateway adapters must implement these methods.
"""
@abstractmethod
def on_message(self, client, userdata, msg):
"""
Method to handle incoming messages from the agent.
Parameters:
client: MQTT client instance.
userdata: Any additional user data passed to the MQTT client.
msg: The MQTT message received from the agent.
"""
pass
@abstractmethod
def connect(self):
"""
Method to establish a connection to the agent.
"""
pass
@abstractmethod
def start(self):
"""
Method to start listening for messages from the agent.
"""
pass
@abstractmethod
def stop(self):
"""
Method to stop the agent gateway and clean up resources.
"""
pass
+20
View File
@@ -0,0 +1,20 @@
from abc import ABC, abstractmethod
from app.entities.processed_agent_data import ProcessedAgentData
class HubGateway(ABC):
"""
Abstract class representing the Store Gateway interface.
All store gateway adapters must implement these methods.
"""
@abstractmethod
def save_data(self, processed_data: ProcessedAgentData) -> bool:
"""
Method to save the processed agent data in the database.
Parameters:
processed_data (ProcessedAgentData): The processed agent data to be saved.
Returns:
bool: True if the data is successfully saved, False otherwise.
"""
pass
+15
View File
@@ -0,0 +1,15 @@
from app.entities.agent_data import AgentData
from app.entities.processed_agent_data import ProcessedAgentData
def process_agent_data(
agent_data: AgentData,
) -> ProcessedAgentData:
"""
Process agent data and classify the state of the road surface.
Parameters:
agent_data (AgentData): Agent data that containing accelerometer, GPS, and timestamp.
Returns:
processed_data_batch (ProcessedAgentData): Processed data containing the classified state of the road surface and agent data.
"""
# Implement it
+24
View File
@@ -0,0 +1,24 @@
import os
def try_parse_int(value: str):
try:
return int(value)
except Exception:
return None
# Configuration for agent MQTT
MQTT_BROKER_HOST = os.environ.get("MQTT_BROKER_HOST") or "localhost"
MQTT_BROKER_PORT = try_parse_int(os.environ.get("MQTT_BROKER_PORT")) or 1883
MQTT_TOPIC = os.environ.get("MQTT_TOPIC") or "agent_data_topic"
# Configuration for hub MQTT
HUB_MQTT_BROKER_HOST = os.environ.get("HUB_MQTT_BROKER_HOST") or "localhost"
HUB_MQTT_BROKER_PORT = try_parse_int(os.environ.get("HUB_MQTT_BROKER_PORT")) or 1883
HUB_MQTT_TOPIC = os.environ.get("HUB_MQTT_TOPIC") or "processed_agent_data_topic"
# Configuration for the Hub
HUB_HOST = os.environ.get("HUB_HOST") or "localhost"
HUB_PORT = try_parse_int(os.environ.get("HUB_PORT")) or 12000
HUB_URL = f"http://{HUB_HOST}:{HUB_PORT}"
+49
View File
@@ -0,0 +1,49 @@
version: "3.9"
# name: "road_vision"
services:
mqtt:
image: eclipse-mosquitto
container_name: mqtt
volumes:
- ./mosquitto:/mosquitto
- ./mosquitto/data:/mosquitto/data
- ./mosquitto/log:/mosquitto/log
ports:
- 1883:1883
- 19001:9001
networks:
mqtt_network:
user: 1000:1000
edge:
container_name: edge
build: ../
depends_on:
- mqtt
environment:
MQTT_BROKER_HOST: "mqtt"
MQTT_BROKER_PORT: 1883
MQTT_TOPIC: " "
HUB_HOST: "store"
HUB_PORT: 8000
HUB_MQTT_BROKER_HOST: "mqtt"
HUB_MQTT_BROKER_PORT: 1883
HUB_MQTT_TOPIC: "processed_data_topic"
networks:
mqtt_network:
edge_hub:
networks:
mqtt_network:
db_network:
edge_hub:
hub:
hub_store:
hub_redis:
volumes:
postgres_data:
pgadmin-data:
@@ -0,0 +1,11 @@
persistence true
persistence_location /mosquitto/data/
listener 1883
## Authentication ##
allow_anonymous true
# allow_anonymous false
# password_file /mosquitto/config/password.txt
## Log ##
log_dest file /mosquitto/log/mosquitto.log
log_dest stdout
# listener 1883
+51
View File
@@ -0,0 +1,51 @@
import logging
from app.adapters.agent_mqtt_adapter import AgentMQTTAdapter
from app.adapters.hub_http_adapter import HubHttpAdapter
from app.adapters.hub_mqtt_adapter import HubMqttAdapter
from config import (
MQTT_BROKER_HOST,
MQTT_BROKER_PORT,
MQTT_TOPIC,
HUB_URL,
HUB_MQTT_BROKER_HOST,
HUB_MQTT_BROKER_PORT,
HUB_MQTT_TOPIC,
)
if __name__ == "__main__":
# Configure logging settings
logging.basicConfig(
level=logging.INFO, # Set the log level to INFO (you can use logging.DEBUG for more detailed logs)
format="[%(asctime)s] [%(levelname)s] [%(module)s] %(message)s",
handlers=[
logging.StreamHandler(), # Output log messages to the console
logging.FileHandler("app.log"), # Save log messages to a file
],
)
# Create an instance of the StoreApiAdapter using the configuration
# hub_adapter = HubHttpAdapter(
# api_base_url=HUB_URL,
# )
hub_adapter = HubMqttAdapter(
broker=HUB_MQTT_BROKER_HOST,
port=HUB_MQTT_BROKER_PORT,
topic=HUB_MQTT_TOPIC,
)
# Create an instance of the AgentMQTTAdapter using the configuration
agent_adapter = AgentMQTTAdapter(
broker_host=MQTT_BROKER_HOST,
broker_port=MQTT_BROKER_PORT,
topic=MQTT_TOPIC,
hub_gateway=hub_adapter,
)
try:
# Connect to the MQTT broker and start listening for messages
agent_adapter.connect()
agent_adapter.start()
# Keep the system running indefinitely (you can add other logic as needed)
while True:
pass
except KeyboardInterrupt:
# Stop the MQTT adapter and exit gracefully if interrupted by the user
agent_adapter.stop()
logging.info("System stopped.")
+10
View File
@@ -0,0 +1,10 @@
annotated-types==0.6.0
certifi==2024.2.2
charset-normalizer==3.3.2
idna==3.6
paho-mqtt==1.6.1
pydantic==2.6.1
pydantic_core==2.16.2
requests==2.31.0
typing_extensions==4.9.0
urllib3==2.2.0
+2
View File
@@ -0,0 +1,2 @@
venv
__pycache__
+11
View File
@@ -0,0 +1,11 @@
# Use the official Python image as the base image
FROM python:3.9-slim
# Set the working directory inside the container
WORKDIR /app
# Copy the requirements.txt file and install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy the entire application into the container
COPY . .
# Run the main.py script inside the container when it starts
CMD ["uvicorn", "main:app", "--host", "0.0.0.0"]
+33
View File
@@ -0,0 +1,33 @@
# Hub
## Instructions for Starting the Project
To start the Hub, follow these steps:
1. Clone the repository to your local machine:
```bash
git clone https://github.com/Toolf/hub.git
cd hub
```
2. Create and activate a virtual environment (optional but recommended):
```bash
python -m venv venv
source venv/bin/activate # On Windows, use: venv\Scripts\activate
```
3. Install the project dependencies:
```bash
pip install -r requirements.txt
```
4. Run the system:
```bash
python ./app/main.py
```
The system will start collecting data from the agent through MQTT and processing it.
## Running Tests
To run tests for the project, use the following command:
```bash
python -m unittest discover tests
```
## Common Commands
### 1. Saving Requirements
To save the project dependencies to the requirements.txt file:
```bash
pip freeze > requirements.txt
```
+24
View File
@@ -0,0 +1,24 @@
import json
import logging
from typing import List
import pydantic_core
import requests
from app.entities.processed_agent_data import ProcessedAgentData
from app.interfaces.store_gateway import StoreGateway
class StoreApiAdapter(StoreGateway):
def __init__(self, api_base_url):
self.api_base_url = api_base_url
def save_data(self, processed_agent_data_batch: List[ProcessedAgentData]):
"""
Save the processed road data to the Store API.
Parameters:
processed_agent_data_batch (dict): Processed road data to be saved.
Returns:
bool: True if the data is successfully saved, False otherwise.
"""
# Implement it
+33
View File
@@ -0,0 +1,33 @@
from datetime import datetime
from pydantic import BaseModel, field_validator
class AccelerometerData(BaseModel):
x: float
y: float
z: float
class GpsData(BaseModel):
latitude: float
longitude: float
class AgentData(BaseModel):
user_id: int
accelerometer: AccelerometerData
gps: GpsData
timestamp: datetime
@classmethod
@field_validator('timestamp', mode='before')
def parse_timestamp(cls, value):
# Convert the timestamp to a datetime object
if isinstance(value, datetime):
return value
try:
return datetime.fromisoformat(value)
except (TypeError, ValueError):
raise ValueError(
"Invalid timestamp format. Expected ISO 8601 format (YYYY-MM-DDTHH:MM:SSZ)."
)
+7
View File
@@ -0,0 +1,7 @@
from pydantic import BaseModel
from app.entities.agent_data import AgentData
class ProcessedAgentData(BaseModel):
road_state: str
agent_data: AgentData
+21
View File
@@ -0,0 +1,21 @@
from abc import ABC, abstractmethod
from typing import List
from app.entities.processed_agent_data import ProcessedAgentData
class StoreGateway(ABC):
"""
Abstract class representing the Store Gateway interface.
All store gateway adapters must implement these methods.
"""
@abstractmethod
def save_data(self, processed_agent_data_batch: List[ProcessedAgentData]) -> bool:
"""
Method to save the processed agent data in the database.
Parameters:
processed_agent_data_batch (ProcessedAgentData): The processed agent data to be saved.
Returns:
bool: True if the data is successfully saved, False otherwise.
"""
pass
+26
View File
@@ -0,0 +1,26 @@
import os
def try_parse_int(value: str):
try:
return int(value)
except Exception:
return None
# Configuration for the Store API
STORE_API_HOST = os.environ.get("STORE_API_HOST") or "localhost"
STORE_API_PORT = try_parse_int(os.environ.get("STORE_API_PORT")) or 8000
STORE_API_BASE_URL = f"http://{STORE_API_HOST}:{STORE_API_PORT}"
# Configure for Redis
REDIS_HOST = os.environ.get("REDIS_HOST") or "localhost"
REDIS_PORT = try_parse_int(os.environ.get("REDIS_PORT")) or 6379
# Configure for hub logic
BATCH_SIZE = try_parse_int(os.environ.get("BATCH_SIZE")) or 20
# MQTT
MQTT_BROKER_HOST = os.environ.get("MQTT_BROKER_HOST") or "localhost"
MQTT_BROKER_PORT = try_parse_int(os.environ.get("MQTT_BROKER_PORT")) or 1883
MQTT_TOPIC = os.environ.get("MQTT_TOPIC") or "processed_agent_data_topic"
+11
View File
@@ -0,0 +1,11 @@
CREATE TABLE processed_agent_data (
id SERIAL PRIMARY KEY,
road_state VARCHAR(255) NOT NULL,
user_id INTEGER NOT NULL,
x FLOAT,
y FLOAT,
z FLOAT,
latitude FLOAT,
longitude FLOAT,
timestamp TIMESTAMP
);
+111
View File
@@ -0,0 +1,111 @@
version: "3.9"
name: "road_vision__hub"
services:
mqtt:
image: eclipse-mosquitto
container_name: mqtt
volumes:
- ./mosquitto:/mosquitto
- ./mosquitto/data:/mosquitto/data
- ./mosquitto/log:/mosquitto/log
ports:
- 1883:1883
- 9001:9001
networks:
mqtt_network:
postgres_db:
image: postgres:latest
container_name: postgres_db
restart: always
environment:
POSTGRES_USER: user
POSTGRES_PASSWORD: pass
POSTGRES_DB: test_db
volumes:
- postgres_data:/var/lib/postgresql/data
- ./db/structure.sql:/docker-entrypoint-initdb.d/structure.sql
ports:
- "5432:5432"
networks:
db_network:
pgadmin:
container_name: pgadmin4
image: dpage/pgadmin4
restart: always
environment:
PGADMIN_DEFAULT_EMAIL: admin@admin.com
PGADMIN_DEFAULT_PASSWORD: root
volumes:
- pgadmin-data:/var/lib/pgadmin
ports:
- "5050:80"
networks:
db_network:
store:
container_name: store
build: ../../store
depends_on:
- postgres_db
restart: always
environment:
POSTGRES_USER: user
POSTGRES_PASSWORD: pass
POSTGRES_DB: test_db
POSTGRES_HOST: postgres_db
POSTGRES_PORT: 5432
ports:
- "8000:8000"
networks:
db_network:
hub_store:
redis:
image: redis:latest
container_name: redis
ports:
- "6379:6379"
networks:
hub_redis:
hub:
container_name: hub
build: ../
depends_on:
- mqtt
- redis
- store
environment:
STORE_API_HOST: "store"
STORE_API_PORT: 8000
REDIS_HOST: "redis"
REDIS_PORT: 6379
MQTT_BROKER_HOST: "mqtt"
MQTT_BROKER_PORT: 1883
MQTT_TOPIC: "processed_data_topic"
BATCH_SIZE: 1
ports:
- "9000:8000"
networks:
mqtt_network:
hub_store:
hub_redis:
networks:
mqtt_network:
db_network:
hub_store:
hub_redis:
volumes:
postgres_data:
pgadmin-data:
@@ -0,0 +1,11 @@
persistence true
persistence_location /mosquitto/data/
listener 1883
## Authentication ##
allow_anonymous true
# allow_anonymous false
# password_file /mosquitto/config/password.txt
## Log ##
log_dest file /mosquitto/log/mosquitto.log
log_dest stdout
# listener 1883
+96
View File
@@ -0,0 +1,96 @@
import logging
from typing import List
from fastapi import FastAPI
from redis import Redis
import paho.mqtt.client as mqtt
from app.adapters.store_api_adapter import StoreApiAdapter
from app.entities.processed_agent_data import ProcessedAgentData
from config import (
STORE_API_BASE_URL,
REDIS_HOST,
REDIS_PORT,
BATCH_SIZE,
MQTT_TOPIC,
MQTT_BROKER_HOST,
MQTT_BROKER_PORT,
)
# Configure logging settings
logging.basicConfig(
level=logging.INFO, # Set the log level to INFO (you can use logging.DEBUG for more detailed logs)
format="[%(asctime)s] [%(levelname)s] [%(module)s] %(message)s",
handlers=[
logging.StreamHandler(), # Output log messages to the console
logging.FileHandler("app.log"), # Save log messages to a file
],
)
# Create an instance of the Redis using the configuration
redis_client = Redis(host=REDIS_HOST, port=REDIS_PORT)
# Create an instance of the StoreApiAdapter using the configuration
store_adapter = StoreApiAdapter(api_base_url=STORE_API_BASE_URL)
# Create an instance of the AgentMQTTAdapter using the configuration
# FastAPI
app = FastAPI()
@app.post("/processed_agent_data/")
async def save_processed_agent_data(processed_agent_data: ProcessedAgentData):
redis_client.lpush("processed_agent_data", processed_agent_data.model_dump_json())
if redis_client.llen("processed_agent_data") >= BATCH_SIZE:
processed_agent_data_batch: List[ProcessedAgentData] = []
for _ in range(BATCH_SIZE):
processed_agent_data = ProcessedAgentData.model_validate_json(
redis_client.lpop("processed_agent_data")
)
processed_agent_data_batch.append(processed_agent_data)
print(processed_agent_data_batch)
store_adapter.save_data(processed_agent_data_batch=processed_agent_data_batch)
return {"status": "ok"}
# MQTT
client = mqtt.Client()
def on_connect(client, userdata, flags, rc):
if rc == 0:
logging.info("Connected to MQTT broker")
client.subscribe(MQTT_TOPIC)
else:
logging.info(f"Failed to connect to MQTT broker with code: {rc}")
def on_message(client, userdata, msg):
try:
payload: str = msg.payload.decode("utf-8")
# Create ProcessedAgentData instance with the received data
processed_agent_data = ProcessedAgentData.model_validate_json(
payload, strict=True
)
redis_client.lpush(
"processed_agent_data", processed_agent_data.model_dump_json()
)
processed_agent_data_batch: List[ProcessedAgentData] = []
if redis_client.llen("processed_agent_data") >= BATCH_SIZE:
for _ in range(BATCH_SIZE):
processed_agent_data = ProcessedAgentData.model_validate_json(
redis_client.lpop("processed_agent_data")
)
processed_agent_data_batch.append(processed_agent_data)
store_adapter.save_data(processed_agent_data_batch=processed_agent_data_batch)
return {"status": "ok"}
except Exception as e:
logging.info(f"Error processing MQTT message: {e}")
# Connect
client.on_connect = on_connect
client.on_message = on_message
client.connect(MQTT_BROKER_HOST, MQTT_BROKER_PORT)
# Start
client.loop_start()
Binary file not shown.
+60
View File
@@ -0,0 +1,60 @@
import unittest
from unittest.mock import Mock
import redis
from app.adapters.agent_mqtt_adapter import AgentMQTTAdapter
from app.interfaces.store_gateway import StoreGateway
from app.entities.agent_data import AccelerometerData, AgentData, GpsData
from app.usecases.data_processing import process_agent_data_batch
class TestAgentMQTTAdapter(unittest.TestCase):
def setUp(self):
# Create a mock StoreGateway for testing
self.mock_store_gateway = Mock(spec=StoreGateway)
self.mock_redis = Mock(spec=redis.Redis)
# Create the AgentMQTTAdapter instance with the mock StoreGateway
self.agent_adapter = AgentMQTTAdapter(
broker_host="test_broker",
broker_port=1234,
topic="test_topic",
store_gateway=self.mock_store_gateway,
redis_client=self.mock_redis,
batch_size=1,
)
def test_on_message_valid_data(self):
# Test handling of valid incoming MQTT message
# (Assuming data is in the correct JSON format)
valid_json_data = '{"user_id": 1,"accelerometer": {"x": 0.1, "y": 0.2, "z": 0.3}, "gps": {"latitude": 10.123, "longitude": 20.456}, "timestamp": "2023-07-21T12:34:56Z"}'
mock_msg = Mock(payload=valid_json_data.encode("utf-8"))
self.mock_redis.llen.return_value = 1
self.mock_redis.rpop.return_value = valid_json_data
# Call on_message with the mock message
self.agent_adapter.on_message(None, None, mock_msg)
# Ensure that the store_gateway's save_data method is called once with the correct arguments
expected_agent_data = AgentData(
user_id=1,
accelerometer=AccelerometerData(
x=0.1,
y=0.2,
z=0.3,
),
gps=GpsData(
latitude=10.123,
longitude=20.456,
),
timestamp="2023-07-21T12:34:56Z",
)
self.mock_store_gateway.save_data.assert_called_once_with(
process_agent_data_batch([expected_agent_data])
)
def test_on_message_invalid_data(self):
# Test handling of invalid incoming MQTT message
# (Assuming data is missing required fields or has incorrect format)
invalid_json_data = '{"user_id": 1, "accelerometer": {"x": 0.1, "y": 0.2}, "gps": {"latitude": 10.123}, "timestamp": 12345}'
mock_msg = Mock(payload=invalid_json_data.encode("utf-8"))
# Call on_message with the mock message
self.agent_adapter.on_message(None, None, mock_msg)
# Ensure that the store_gateway's save_data method is not called (due to invalid data)
self.mock_store_gateway.save_data.assert_not_called()
if __name__ == "__main__":
unittest.main()
+72
View File
@@ -0,0 +1,72 @@
import requests
import unittest
from unittest.mock import Mock, patch
from app.adapters.store_api_adapter import StoreApiAdapter
from app.entities.agent_data import AccelerometerData, AgentData, GpsData
from app.entities.processed_agent_data import ProcessedAgentData
class TestStoreApiAdapter(unittest.TestCase):
def setUp(self):
# Create the StoreApiAdapter instance
self.store_api_adapter = StoreApiAdapter(api_base_url="http://test-api.com")
@patch.object(requests, "post")
def test_save_data_success(self, mock_post):
# Test successful saving of data to the Store API
# Sample processed road data
agent_data = AgentData(
user_id=1,
accelerometer=AccelerometerData(
x=0.1,
y=0.2,
z=0.3,
),
gps=GpsData(
latitude=10.123,
longitude=20.456,
),
timestamp="2023-07-21T12:34:56Z",
)
processed_data = ProcessedAgentData(road_state="normal", agent_data=agent_data)
# Mock the response from the Store API
mock_response = Mock(status_code=201) # 201 indicates successful creation
mock_post.return_value = mock_response
# Call the save_data method
result = self.store_api_adapter.save_data(processed_data)
# Ensure that the post method of the mock is called with the correct arguments
mock_post.assert_called_once_with(
"http://test-api.com/agent_data", json=processed_data.model_dump()
)
# Ensure that the result is True, indicating successful saving
self.assertTrue(result)
@patch.object(requests, "post")
def test_save_data_failure(self, mock_post):
# Test failure to save data to the Store API
# Sample processed road data
agent_data = AgentData(
user_id=1,
accelerometer=AccelerometerData(
x=0.1,
y=0.2,
z=0.3,
),
gps=GpsData(
latitude=10.123,
longitude=20.456,
),
timestamp="2023-07-21T12:34:56Z",
)
processed_data = ProcessedAgentData(road_state="normal", agent_data=agent_data)
# Mock the response from the Store API
mock_response = Mock(status_code=400) # 400 indicates a client error
mock_post.return_value = mock_response
# Call the save_data method
result = self.store_api_adapter.save_data(processed_data)
# Ensure that the post method of the mock is called with the correct arguments
mock_post.assert_called_once_with(
"http://test-api.com/agent_data", json=processed_data.model_dump()
)
# Ensure that the result is False, indicating failure to save
self.assertFalse(result)
if __name__ == "__main__":
unittest.main()