Compare commits
10 Commits
abd6bf0abe
...
b2c7427af0
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b2c7427af0 | ||
| e45faeb281 | |||
|
|
b8db2fe6ee | ||
|
|
0c5954f96c | ||
|
|
3a4fa51aa9 | ||
|
|
e32ba94adc | ||
|
|
93cc8d7378 | ||
|
|
b65670861d | ||
|
|
9a179e09e9 | ||
|
|
173a61d117 |
3
MapView/.gitignore
vendored
Normal file
3
MapView/.gitignore
vendored
Normal file
@ -0,0 +1,3 @@
|
|||||||
|
.idea
|
||||||
|
venv
|
||||||
|
__pycache__
|
||||||
102
MapView/README.md
Normal file
102
MapView/README.md
Normal file
@ -0,0 +1,102 @@
|
|||||||
|
## Лабораторна робота №5
|
||||||
|
### Тема
|
||||||
|
|
||||||
|
Візуалізація якості стану дорожнього покриття за допомогою фреймворку Kivy.
|
||||||
|
|
||||||
|
### Мета
|
||||||
|
|
||||||
|
Розробити програму для візуалізації руху машини на дорозі та якості дороги за допомогою даних датчиків.
|
||||||
|
|
||||||
|
### Підготовка робочого середовище, встановлення проекту
|
||||||
|
|
||||||
|
Створення віртуального середовища
|
||||||
|
|
||||||
|
`python -m venv ./venv`
|
||||||
|
|
||||||
|
Активація середовища для linux
|
||||||
|
|
||||||
|
`source ./venv/bin/activate`
|
||||||
|
|
||||||
|
Активація середовища для windows
|
||||||
|
|
||||||
|
`venv\Scripts\activate`
|
||||||
|
|
||||||
|
Встановлення необхідних бібліотек
|
||||||
|
|
||||||
|
`pip install -r requirements.txt`
|
||||||
|
|
||||||
|
### Завдання
|
||||||
|
|
||||||
|
Для відображення мапи використовується віджет [Mapview](https://mapview.readthedocs.io/en/1.0.4/) для Kivy.
|
||||||
|
|
||||||
|
Для візуалізації руху машини можна використовувати MapMarker та рухати його відповідно GPS даних.
|
||||||
|
Для позначення нерівностей на дорозі також використовувати маркери. Зображення для маркерів можна знайти в папці images.
|
||||||
|
|
||||||
|
Для створення та редагування маршруту машини на мапі використовуйте клас LineMapLayer та функцію
|
||||||
|
`add_point()` з файлу lineMapLayer.py. Додавання лінії на мапу:
|
||||||
|
|
||||||
|
```python
|
||||||
|
map_layer = LineMapLayer()
|
||||||
|
mapview.add_layer(map_layer, mode="scatter")
|
||||||
|
```
|
||||||
|
|
||||||
|
Щоб створити затримку при відображенні руху машини можна використовувати
|
||||||
|
функцію `kivy.clock.Clock.schedule_once()` або `kivy.clock.Clock.schedule_interval()`.
|
||||||
|
|
||||||
|
Дані для відображення на мапі (координати та стан дороги) зчитуються з бази даних через вебсокет.
|
||||||
|
Для їх отримання використовуйте функцію `get_new_points()` з datasource.py.
|
||||||
|
|
||||||
|
**Шаблон основного файлу проєкту**
|
||||||
|
|
||||||
|
```python
|
||||||
|
from kivy.app import App
|
||||||
|
from kivy_garden.mapview import MapMarker, MapView
|
||||||
|
from kivy.clock import Clock
|
||||||
|
from lineMapLayer import LineMapLayer
|
||||||
|
|
||||||
|
|
||||||
|
class MapViewApp(App):
|
||||||
|
def __init__(self, **kwargs):
|
||||||
|
super().__init__()
|
||||||
|
# додати необхідні змінні
|
||||||
|
|
||||||
|
def on_start(self):
|
||||||
|
"""
|
||||||
|
Встановлює необхідні маркери, викликає функцію для оновлення мапи
|
||||||
|
"""
|
||||||
|
|
||||||
|
def update(self, *args):
|
||||||
|
"""
|
||||||
|
Викликається регулярно для оновлення мапи
|
||||||
|
"""
|
||||||
|
|
||||||
|
def update_car_marker(self, point):
|
||||||
|
"""
|
||||||
|
Оновлює відображення маркера машини на мапі
|
||||||
|
:param point: GPS координати
|
||||||
|
"""
|
||||||
|
|
||||||
|
def set_pothole_marker(self, point):
|
||||||
|
"""
|
||||||
|
Встановлює маркер для ями
|
||||||
|
:param point: GPS координати
|
||||||
|
"""
|
||||||
|
|
||||||
|
def set_bump_marker(self, point):
|
||||||
|
"""
|
||||||
|
Встановлює маркер для лежачого поліцейського
|
||||||
|
:param point: GPS координати
|
||||||
|
"""
|
||||||
|
|
||||||
|
def build(self):
|
||||||
|
"""
|
||||||
|
Ініціалізує мапу MapView(zoom, lat, lon)
|
||||||
|
:return: мапу
|
||||||
|
"""
|
||||||
|
self.mapview = MapView()
|
||||||
|
return self.mapview
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
MapViewApp().run()
|
||||||
|
```
|
||||||
4
MapView/config.py
Normal file
4
MapView/config.py
Normal file
@ -0,0 +1,4 @@
|
|||||||
|
import os
|
||||||
|
|
||||||
|
STORE_HOST = os.environ.get("STORE_HOST") or "localhost"
|
||||||
|
STORE_PORT = os.environ.get("STORE_PORT") or 8000
|
||||||
1
MapView/data.csv
Normal file
1
MapView/data.csv
Normal file
File diff suppressed because one or more lines are too long
81
MapView/datasource.py
Normal file
81
MapView/datasource.py
Normal file
@ -0,0 +1,81 @@
|
|||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
from datetime import datetime
|
||||||
|
import websockets
|
||||||
|
from kivy import Logger
|
||||||
|
from pydantic import BaseModel, field_validator
|
||||||
|
from config import STORE_HOST, STORE_PORT
|
||||||
|
|
||||||
|
|
||||||
|
# Pydantic models
|
||||||
|
class ProcessedAgentData(BaseModel):
|
||||||
|
road_state: str
|
||||||
|
user_id: int
|
||||||
|
x: float
|
||||||
|
y: float
|
||||||
|
z: float
|
||||||
|
latitude: float
|
||||||
|
longitude: float
|
||||||
|
timestamp: datetime
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
@field_validator("timestamp", mode="before")
|
||||||
|
def check_timestamp(cls, value):
|
||||||
|
if isinstance(value, datetime):
|
||||||
|
return value
|
||||||
|
try:
|
||||||
|
return datetime.fromisoformat(value)
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
raise ValueError(
|
||||||
|
"Invalid timestamp format. Expected ISO 8601 format (YYYY-MM-DDTHH:MM:SSZ)."
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class Datasource:
|
||||||
|
def __init__(self, user_id: int):
|
||||||
|
self.index = 0
|
||||||
|
self.user_id = user_id
|
||||||
|
self.connection_status = None
|
||||||
|
self._new_points = []
|
||||||
|
asyncio.ensure_future(self.connect_to_server())
|
||||||
|
|
||||||
|
def get_new_points(self):
|
||||||
|
Logger.debug(self._new_points)
|
||||||
|
points = self._new_points
|
||||||
|
self._new_points = []
|
||||||
|
return points
|
||||||
|
|
||||||
|
async def connect_to_server(self):
|
||||||
|
uri = f"ws://{STORE_HOST}:{STORE_PORT}/ws/{self.user_id}"
|
||||||
|
while True:
|
||||||
|
Logger.debug("CONNECT TO SERVER")
|
||||||
|
async with websockets.connect(uri) as websocket:
|
||||||
|
self.connection_status = "Connected"
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
data = await websocket.recv()
|
||||||
|
parsed_data = json.loads(data)
|
||||||
|
self.handle_received_data(parsed_data)
|
||||||
|
except websockets.ConnectionClosedOK:
|
||||||
|
self.connection_status = "Disconnected"
|
||||||
|
Logger.debug("SERVER DISCONNECT")
|
||||||
|
|
||||||
|
def handle_received_data(self, data):
|
||||||
|
# Update your UI or perform actions with received data here
|
||||||
|
Logger.debug(f"Received data: {data}")
|
||||||
|
processed_agent_data_list = sorted(
|
||||||
|
[
|
||||||
|
ProcessedAgentData(**processed_data_json)
|
||||||
|
for processed_data_json in json.loads(data)
|
||||||
|
],
|
||||||
|
key=lambda v: v.timestamp,
|
||||||
|
)
|
||||||
|
new_points = [
|
||||||
|
(
|
||||||
|
processed_agent_data.latitude,
|
||||||
|
processed_agent_data.longitude,
|
||||||
|
processed_agent_data.road_state,
|
||||||
|
)
|
||||||
|
for processed_agent_data in processed_agent_data_list
|
||||||
|
]
|
||||||
|
self._new_points.extend(new_points)
|
||||||
BIN
MapView/images/bump.png
Normal file
BIN
MapView/images/bump.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 2.4 KiB |
BIN
MapView/images/car.png
Normal file
BIN
MapView/images/car.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 4.0 KiB |
BIN
MapView/images/pothole.png
Normal file
BIN
MapView/images/pothole.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 4.3 KiB |
147
MapView/lineMapLayer.py
Normal file
147
MapView/lineMapLayer.py
Normal file
@ -0,0 +1,147 @@
|
|||||||
|
from kivy_garden.mapview import MapLayer, MapMarker
|
||||||
|
from kivy.graphics import Color, Line
|
||||||
|
from kivy.graphics.context_instructions import Translate, Scale, PushMatrix, PopMatrix
|
||||||
|
from kivy_garden.mapview.utils import clamp
|
||||||
|
from kivy_garden.mapview.constants import (MIN_LONGITUDE, MAX_LONGITUDE, MIN_LATITUDE, MAX_LATITUDE)
|
||||||
|
from math import radians, log, tan, cos, pi
|
||||||
|
|
||||||
|
|
||||||
|
class LineMapLayer(MapLayer):
|
||||||
|
def __init__(self, coordinates=None, color=[0, 0, 1, 1], width=2, **kwargs):
|
||||||
|
super().__init__(**kwargs)
|
||||||
|
# if coordinates is None:
|
||||||
|
# coordinates = [[0, 0], [0, 0]]
|
||||||
|
self._coordinates = coordinates
|
||||||
|
self.color = color
|
||||||
|
self._line_points = None
|
||||||
|
self._line_points_offset = (0, 0)
|
||||||
|
self.zoom = 0
|
||||||
|
self.lon = 0
|
||||||
|
self.lat = 0
|
||||||
|
self.ms = 0
|
||||||
|
self._width = width
|
||||||
|
|
||||||
|
@property
|
||||||
|
def coordinates(self):
|
||||||
|
return self._coordinates
|
||||||
|
|
||||||
|
@coordinates.setter
|
||||||
|
def coordinates(self, coordinates):
|
||||||
|
self._coordinates = coordinates
|
||||||
|
self.invalidate_line_points()
|
||||||
|
self.clear_and_redraw()
|
||||||
|
|
||||||
|
def add_point(self, point):
|
||||||
|
if self._coordinates is None:
|
||||||
|
#self._coordinates = [point]
|
||||||
|
self._coordinates = []
|
||||||
|
self._coordinates.append(point)
|
||||||
|
# self._coordinates = [self._coordinates[-1], point]
|
||||||
|
self.invalidate_line_points()
|
||||||
|
self.clear_and_redraw()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def line_points(self):
|
||||||
|
if self._line_points is None:
|
||||||
|
self.calc_line_points()
|
||||||
|
return self._line_points
|
||||||
|
|
||||||
|
@property
|
||||||
|
def line_points_offset(self):
|
||||||
|
if self._line_points is None:
|
||||||
|
self.calc_line_points()
|
||||||
|
return self._line_points_offset
|
||||||
|
|
||||||
|
def calc_line_points(self):
|
||||||
|
# Offset all points by the coordinates of the first point,
|
||||||
|
# to keep coordinates closer to zero.
|
||||||
|
# (and therefore avoid some float precision issues when drawing lines)
|
||||||
|
self._line_points_offset = (self.get_x(self.coordinates[0][1]),
|
||||||
|
self.get_y(self.coordinates[0][0]))
|
||||||
|
# Since lat is not a linear transform we must compute manually
|
||||||
|
self._line_points = [(self.get_x(lon) - self._line_points_offset[0],
|
||||||
|
self.get_y(lat) - self._line_points_offset[1])
|
||||||
|
for lat, lon in self.coordinates]
|
||||||
|
|
||||||
|
def invalidate_line_points(self):
|
||||||
|
self._line_points = None
|
||||||
|
self._line_points_offset = (0, 0)
|
||||||
|
|
||||||
|
def get_x(self, lon):
|
||||||
|
"""Get the x position on the map using this map source's projection
|
||||||
|
(0, 0) is located at the top left.
|
||||||
|
"""
|
||||||
|
return clamp(lon, MIN_LONGITUDE, MAX_LONGITUDE) * self.ms / 360.0
|
||||||
|
|
||||||
|
def get_y(self, lat):
|
||||||
|
"""Get the y position on the map using this map source's projection
|
||||||
|
(0, 0) is located at the top left.
|
||||||
|
"""
|
||||||
|
lat = radians(clamp(-lat, MIN_LATITUDE, MAX_LATITUDE))
|
||||||
|
return (1.0 - log(tan(lat) + 1.0 / cos(lat)) / pi) * self.ms / 2.0
|
||||||
|
|
||||||
|
# Function called when the MapView is moved
|
||||||
|
def reposition(self):
|
||||||
|
map_view = self.parent
|
||||||
|
|
||||||
|
# Must redraw when the zoom changes
|
||||||
|
# as the scatter transform resets for the new tiles
|
||||||
|
if self.zoom != map_view.zoom or \
|
||||||
|
self.lon != round(map_view.lon, 7) or \
|
||||||
|
self.lat != round(map_view.lat, 7):
|
||||||
|
map_source = map_view.map_source
|
||||||
|
self.ms = pow(2.0, map_view.zoom) * map_source.dp_tile_size
|
||||||
|
self.invalidate_line_points()
|
||||||
|
self.clear_and_redraw()
|
||||||
|
|
||||||
|
def clear_and_redraw(self, *args):
|
||||||
|
with self.canvas:
|
||||||
|
# Clear old line
|
||||||
|
self.canvas.clear()
|
||||||
|
|
||||||
|
self._draw_line()
|
||||||
|
|
||||||
|
def _draw_line(self, *args):
|
||||||
|
if self._coordinates is None:
|
||||||
|
return
|
||||||
|
map_view = self.parent
|
||||||
|
self.zoom = map_view.zoom
|
||||||
|
self.lon = map_view.lon
|
||||||
|
self.lat = map_view.lat
|
||||||
|
|
||||||
|
# When zooming we must undo the current scatter transform
|
||||||
|
# or the animation distorts it
|
||||||
|
scatter = map_view._scatter
|
||||||
|
sx, sy, ss = scatter.x, scatter.y, scatter.scale
|
||||||
|
|
||||||
|
# Account for map source tile size and map view zoom
|
||||||
|
vx, vy, vs = map_view.viewport_pos[0], map_view.viewport_pos[1], map_view.scale
|
||||||
|
|
||||||
|
with self.canvas:
|
||||||
|
self.opacity = 0.5
|
||||||
|
# Save the current coordinate space context
|
||||||
|
PushMatrix()
|
||||||
|
|
||||||
|
# Offset by the MapView's position in the window (always 0,0 ?)
|
||||||
|
Translate(*map_view.pos)
|
||||||
|
|
||||||
|
# Undo the scatter animation transform
|
||||||
|
Scale(1 / ss, 1 / ss, 1)
|
||||||
|
Translate(-sx, -sy)
|
||||||
|
|
||||||
|
# Apply the get window xy from transforms
|
||||||
|
Scale(vs, vs, 1)
|
||||||
|
Translate(-vx, -vy)
|
||||||
|
|
||||||
|
# Apply what we can factor out of the mapsource long, lat to x, y conversion
|
||||||
|
Translate(self.ms / 2, 0)
|
||||||
|
|
||||||
|
# Translate by the offset of the line points
|
||||||
|
# (this keeps the points closer to the origin)
|
||||||
|
Translate(*self.line_points_offset)
|
||||||
|
|
||||||
|
Color(*self.color)
|
||||||
|
Line(points=self.line_points, width=self._width)
|
||||||
|
|
||||||
|
# Retrieve the last saved coordinate space context
|
||||||
|
PopMatrix()
|
||||||
55
MapView/main.py
Normal file
55
MapView/main.py
Normal file
@ -0,0 +1,55 @@
|
|||||||
|
import asyncio
|
||||||
|
from kivy.app import App
|
||||||
|
from kivy_garden.mapview import MapMarker, MapView
|
||||||
|
from kivy.clock import Clock
|
||||||
|
from lineMapLayer import LineMapLayer
|
||||||
|
from datasource import Datasource
|
||||||
|
|
||||||
|
|
||||||
|
class MapViewApp(App):
|
||||||
|
def __init__(self, **kwargs):
|
||||||
|
super().__init__()
|
||||||
|
# додати необхідні змінні
|
||||||
|
|
||||||
|
def on_start(self):
|
||||||
|
"""
|
||||||
|
Встановлює необхідні маркери, викликає функцію для оновлення мапи
|
||||||
|
"""
|
||||||
|
|
||||||
|
def update(self, *args):
|
||||||
|
"""
|
||||||
|
Викликається регулярно для оновлення мапи
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
def update_car_marker(self, point):
|
||||||
|
"""
|
||||||
|
Оновлює відображення маркера машини на мапі
|
||||||
|
:param point: GPS координати
|
||||||
|
"""
|
||||||
|
|
||||||
|
def set_pothole_marker(self, point):
|
||||||
|
"""
|
||||||
|
Встановлює маркер для ями
|
||||||
|
:param point: GPS координати
|
||||||
|
"""
|
||||||
|
|
||||||
|
def set_bump_marker(self, point):
|
||||||
|
"""
|
||||||
|
Встановлює маркер для лежачого поліцейського
|
||||||
|
:param point: GPS координати
|
||||||
|
"""
|
||||||
|
|
||||||
|
def build(self):
|
||||||
|
"""
|
||||||
|
Ініціалізує мапу MapView(zoom, lat, lon)
|
||||||
|
:return: мапу
|
||||||
|
"""
|
||||||
|
self.mapview = MapView()
|
||||||
|
return self.mapview
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
loop.run_until_complete(MapViewApp().async_run(async_lib="asyncio"))
|
||||||
|
loop.close()
|
||||||
15
MapView/requirements.txt
Normal file
15
MapView/requirements.txt
Normal file
@ -0,0 +1,15 @@
|
|||||||
|
annotated-types==0.6.0
|
||||||
|
certifi==2024.2.2
|
||||||
|
charset-normalizer==3.3.2
|
||||||
|
docutils==0.20.1
|
||||||
|
idna==3.6
|
||||||
|
Kivy==2.3.0
|
||||||
|
Kivy-Garden==0.1.5
|
||||||
|
kivy-garden.mapview==1.0.6
|
||||||
|
pydantic==2.6.2
|
||||||
|
pydantic_core==2.16.3
|
||||||
|
Pygments==2.17.2
|
||||||
|
requests==2.31.0
|
||||||
|
typing_extensions==4.10.0
|
||||||
|
urllib3==2.2.1
|
||||||
|
websockets==12.0
|
||||||
@ -1,4 +1,3 @@
|
|||||||
version: "3.9"
|
|
||||||
name: "road_vision"
|
name: "road_vision"
|
||||||
services:
|
services:
|
||||||
mqtt:
|
mqtt:
|
||||||
|
|||||||
@ -8,6 +8,7 @@ def try_parse(type, value: str):
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
USER_ID = 1
|
||||||
# MQTT config
|
# MQTT config
|
||||||
MQTT_BROKER_HOST = os.environ.get("MQTT_BROKER_HOST") or "mqtt"
|
MQTT_BROKER_HOST = os.environ.get("MQTT_BROKER_HOST") or "mqtt"
|
||||||
MQTT_BROKER_PORT = try_parse(int, os.environ.get("MQTT_BROKER_PORT")) or 1883
|
MQTT_BROKER_PORT = try_parse(int, os.environ.get("MQTT_BROKER_PORT")) or 1883
|
||||||
|
|||||||
@ -9,4 +9,5 @@ from domain.gps import Gps
|
|||||||
class AggregatedData:
|
class AggregatedData:
|
||||||
accelerometer: Accelerometer
|
accelerometer: Accelerometer
|
||||||
gps: Gps
|
gps: Gps
|
||||||
time: datetime
|
timestamp: datetime
|
||||||
|
user_id: int
|
||||||
|
|||||||
@ -1,6 +1,9 @@
|
|||||||
from csv import reader
|
from csv import reader
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
from domain.accelerometer import Accelerometer
|
||||||
|
from domain.gps import Gps
|
||||||
from domain.aggregated_data import AggregatedData
|
from domain.aggregated_data import AggregatedData
|
||||||
|
import config
|
||||||
|
|
||||||
|
|
||||||
class FileDatasource:
|
class FileDatasource:
|
||||||
@ -13,6 +16,12 @@ class FileDatasource:
|
|||||||
|
|
||||||
def read(self) -> AggregatedData:
|
def read(self) -> AggregatedData:
|
||||||
"""Метод повертає дані отримані з датчиків"""
|
"""Метод повертає дані отримані з датчиків"""
|
||||||
|
return AggregatedData(
|
||||||
|
Accelerometer(1, 2, 3),
|
||||||
|
Gps(4, 5),
|
||||||
|
datetime.now(),
|
||||||
|
config.USER_ID,
|
||||||
|
)
|
||||||
|
|
||||||
def startReading(self, *args, **kwargs):
|
def startReading(self, *args, **kwargs):
|
||||||
"""Метод повинен викликатись перед початком читання даних"""
|
"""Метод повинен викликатись перед початком читання даних"""
|
||||||
|
|||||||
@ -6,4 +6,5 @@ from schema.gps_schema import GpsSchema
|
|||||||
class AggregatedDataSchema(Schema):
|
class AggregatedDataSchema(Schema):
|
||||||
accelerometer = fields.Nested(AccelerometerSchema)
|
accelerometer = fields.Nested(AccelerometerSchema)
|
||||||
gps = fields.Nested(GpsSchema)
|
gps = fields.Nested(GpsSchema)
|
||||||
time = fields.DateTime("iso")
|
timestamp = fields.DateTime("iso")
|
||||||
|
user_id = fields.Int()
|
||||||
|
|||||||
2
edge/.gitignore
vendored
Normal file
2
edge/.gitignore
vendored
Normal file
@ -0,0 +1,2 @@
|
|||||||
|
venv
|
||||||
|
app.log
|
||||||
11
edge/Dockerfile
Normal file
11
edge/Dockerfile
Normal file
@ -0,0 +1,11 @@
|
|||||||
|
# Use the official Python image as the base image
|
||||||
|
FROM python:3.9-slim
|
||||||
|
# Set the working directory inside the container
|
||||||
|
WORKDIR /app
|
||||||
|
# Copy the requirements.txt file and install dependencies
|
||||||
|
COPY requirements.txt .
|
||||||
|
RUN pip install --no-cache-dir -r requirements.txt
|
||||||
|
# Copy the entire application into the container
|
||||||
|
COPY . .
|
||||||
|
# Run the main.py script inside the container when it starts
|
||||||
|
CMD ["python", "main.py"]
|
||||||
76
edge/app/adapters/agent_mqtt_adapter.py
Normal file
76
edge/app/adapters/agent_mqtt_adapter.py
Normal file
@ -0,0 +1,76 @@
|
|||||||
|
import logging
|
||||||
|
import paho.mqtt.client as mqtt
|
||||||
|
from app.interfaces.agent_gateway import AgentGateway
|
||||||
|
from app.entities.agent_data import AgentData, GpsData
|
||||||
|
from app.usecases.data_processing import process_agent_data
|
||||||
|
from app.interfaces.hub_gateway import HubGateway
|
||||||
|
|
||||||
|
|
||||||
|
class AgentMQTTAdapter(AgentGateway):
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
broker_host,
|
||||||
|
broker_port,
|
||||||
|
topic,
|
||||||
|
hub_gateway: HubGateway,
|
||||||
|
batch_size=10,
|
||||||
|
):
|
||||||
|
self.batch_size = batch_size
|
||||||
|
# MQTT
|
||||||
|
self.broker_host = broker_host
|
||||||
|
self.broker_port = broker_port
|
||||||
|
self.topic = topic
|
||||||
|
self.client = mqtt.Client()
|
||||||
|
# Hub
|
||||||
|
self.hub_gateway = hub_gateway
|
||||||
|
|
||||||
|
def on_connect(self, client, userdata, flags, rc):
|
||||||
|
if rc == 0:
|
||||||
|
logging.info("Connected to MQTT broker")
|
||||||
|
self.client.subscribe(self.topic)
|
||||||
|
else:
|
||||||
|
logging.info(f"Failed to connect to MQTT broker with code: {rc}")
|
||||||
|
|
||||||
|
def on_message(self, client, userdata, msg):
|
||||||
|
"""Processing agent data and sent it to hub gateway"""
|
||||||
|
try:
|
||||||
|
payload: str = msg.payload.decode("utf-8")
|
||||||
|
# Create AgentData instance with the received data
|
||||||
|
agent_data = AgentData.model_validate_json(payload, strict=True)
|
||||||
|
# Process the received data (you can call a use case here if needed)
|
||||||
|
processed_data = process_agent_data(agent_data)
|
||||||
|
# Store the agent_data in the database (you can send it to the data processing module)
|
||||||
|
if not self.hub_gateway.save_data(processed_data):
|
||||||
|
logging.error("Hub is not available")
|
||||||
|
except Exception as e:
|
||||||
|
logging.info(f"Error processing MQTT message: {e}")
|
||||||
|
|
||||||
|
def connect(self):
|
||||||
|
self.client.on_connect = self.on_connect
|
||||||
|
self.client.on_message = self.on_message
|
||||||
|
self.client.connect(self.broker_host, self.broker_port, 60)
|
||||||
|
|
||||||
|
def start(self):
|
||||||
|
self.client.loop_start()
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
self.client.loop_stop()
|
||||||
|
|
||||||
|
|
||||||
|
# Usage example:
|
||||||
|
if __name__ == "__main__":
|
||||||
|
broker_host = "localhost"
|
||||||
|
broker_port = 1883
|
||||||
|
topic = "agent_data_topic"
|
||||||
|
# Assuming you have implemented the StoreGateway and passed it to the adapter
|
||||||
|
store_gateway = HubGateway()
|
||||||
|
adapter = AgentMQTTAdapter(broker_host, broker_port, topic, store_gateway)
|
||||||
|
adapter.connect()
|
||||||
|
adapter.start()
|
||||||
|
try:
|
||||||
|
# Keep the adapter running in the background
|
||||||
|
while True:
|
||||||
|
pass
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
adapter.stop()
|
||||||
|
logging.info("Adapter stopped.")
|
||||||
29
edge/app/adapters/hub_http_adapter.py
Normal file
29
edge/app/adapters/hub_http_adapter.py
Normal file
@ -0,0 +1,29 @@
|
|||||||
|
import logging
|
||||||
|
|
||||||
|
import requests as requests
|
||||||
|
|
||||||
|
from app.entities.processed_agent_data import ProcessedAgentData
|
||||||
|
from app.interfaces.hub_gateway import HubGateway
|
||||||
|
|
||||||
|
|
||||||
|
class HubHttpAdapter(HubGateway):
|
||||||
|
def __init__(self, api_base_url):
|
||||||
|
self.api_base_url = api_base_url
|
||||||
|
|
||||||
|
def save_data(self, processed_data: ProcessedAgentData):
|
||||||
|
"""
|
||||||
|
Save the processed road data to the Hub.
|
||||||
|
Parameters:
|
||||||
|
processed_data (ProcessedAgentData): Processed road data to be saved.
|
||||||
|
Returns:
|
||||||
|
bool: True if the data is successfully saved, False otherwise.
|
||||||
|
"""
|
||||||
|
url = f"{self.api_base_url}/processed_agent_data/"
|
||||||
|
|
||||||
|
response = requests.post(url, data=processed_data.model_dump_json())
|
||||||
|
if response.status_code != 200:
|
||||||
|
logging.info(
|
||||||
|
f"Invalid Hub response\nData: {processed_data.model_dump_json()}\nResponse: {response}"
|
||||||
|
)
|
||||||
|
return False
|
||||||
|
return True
|
||||||
50
edge/app/adapters/hub_mqtt_adapter.py
Normal file
50
edge/app/adapters/hub_mqtt_adapter.py
Normal file
@ -0,0 +1,50 @@
|
|||||||
|
import logging
|
||||||
|
|
||||||
|
import requests as requests
|
||||||
|
from paho.mqtt import client as mqtt_client
|
||||||
|
|
||||||
|
from app.entities.processed_agent_data import ProcessedAgentData
|
||||||
|
from app.interfaces.hub_gateway import HubGateway
|
||||||
|
|
||||||
|
|
||||||
|
class HubMqttAdapter(HubGateway):
|
||||||
|
def __init__(self, broker, port, topic):
|
||||||
|
self.broker = broker
|
||||||
|
self.port = port
|
||||||
|
self.topic = topic
|
||||||
|
self.mqtt_client = self._connect_mqtt(broker, port)
|
||||||
|
|
||||||
|
def save_data(self, processed_data: ProcessedAgentData):
|
||||||
|
"""
|
||||||
|
Save the processed road data to the Hub.
|
||||||
|
Parameters:
|
||||||
|
processed_data (ProcessedAgentData): Processed road data to be saved.
|
||||||
|
Returns:
|
||||||
|
bool: True if the data is successfully saved, False otherwise.
|
||||||
|
"""
|
||||||
|
msg = processed_data.model_dump_json()
|
||||||
|
result = self.mqtt_client.publish(self.topic, msg)
|
||||||
|
status = result[0]
|
||||||
|
if status == 0:
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
print(f"Failed to send message to topic {self.topic}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _connect_mqtt(broker, port):
|
||||||
|
"""Create MQTT client"""
|
||||||
|
print(f"CONNECT TO {broker}:{port}")
|
||||||
|
|
||||||
|
def on_connect(client, userdata, flags, rc):
|
||||||
|
if rc == 0:
|
||||||
|
print(f"Connected to MQTT Broker ({broker}:{port})!")
|
||||||
|
else:
|
||||||
|
print("Failed to connect {broker}:{port}, return code %d\n", rc)
|
||||||
|
exit(rc) # Stop execution
|
||||||
|
|
||||||
|
client = mqtt_client.Client()
|
||||||
|
client.on_connect = on_connect
|
||||||
|
client.connect(broker, port)
|
||||||
|
client.loop_start()
|
||||||
|
return client
|
||||||
32
edge/app/entities/agent_data.py
Normal file
32
edge/app/entities/agent_data.py
Normal file
@ -0,0 +1,32 @@
|
|||||||
|
from datetime import datetime
|
||||||
|
from pydantic import BaseModel, field_validator
|
||||||
|
|
||||||
|
|
||||||
|
class AccelerometerData(BaseModel):
|
||||||
|
x: float
|
||||||
|
y: float
|
||||||
|
z: float
|
||||||
|
|
||||||
|
|
||||||
|
class GpsData(BaseModel):
|
||||||
|
latitude: float
|
||||||
|
longitude: float
|
||||||
|
|
||||||
|
|
||||||
|
class AgentData(BaseModel):
|
||||||
|
accelerometer: AccelerometerData
|
||||||
|
gps: GpsData
|
||||||
|
timestamp: datetime
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
@field_validator("timestamp", mode="before")
|
||||||
|
def parse_timestamp(cls, value):
|
||||||
|
# Convert the timestamp to a datetime object
|
||||||
|
if isinstance(value, datetime):
|
||||||
|
return value
|
||||||
|
try:
|
||||||
|
return datetime.fromisoformat(value)
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
raise ValueError(
|
||||||
|
"Invalid timestamp format. Expected ISO 8601 format (YYYY-MM-DDTHH:MM:SSZ)."
|
||||||
|
)
|
||||||
7
edge/app/entities/processed_agent_data.py
Normal file
7
edge/app/entities/processed_agent_data.py
Normal file
@ -0,0 +1,7 @@
|
|||||||
|
from pydantic import BaseModel
|
||||||
|
from app.entities.agent_data import AgentData
|
||||||
|
|
||||||
|
|
||||||
|
class ProcessedAgentData(BaseModel):
|
||||||
|
road_state: str
|
||||||
|
agent_data: AgentData
|
||||||
40
edge/app/interfaces/agent_gateway.py
Normal file
40
edge/app/interfaces/agent_gateway.py
Normal file
@ -0,0 +1,40 @@
|
|||||||
|
from abc import ABC, abstractmethod
|
||||||
|
|
||||||
|
|
||||||
|
class AgentGateway(ABC):
|
||||||
|
"""
|
||||||
|
Abstract class representing the Agent Gateway interface.
|
||||||
|
All agent gateway adapters must implement these methods.
|
||||||
|
"""
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def on_message(self, client, userdata, msg):
|
||||||
|
"""
|
||||||
|
Method to handle incoming messages from the agent.
|
||||||
|
Parameters:
|
||||||
|
client: MQTT client instance.
|
||||||
|
userdata: Any additional user data passed to the MQTT client.
|
||||||
|
msg: The MQTT message received from the agent.
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def connect(self):
|
||||||
|
"""
|
||||||
|
Method to establish a connection to the agent.
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def start(self):
|
||||||
|
"""
|
||||||
|
Method to start listening for messages from the agent.
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def stop(self):
|
||||||
|
"""
|
||||||
|
Method to stop the agent gateway and clean up resources.
|
||||||
|
"""
|
||||||
|
pass
|
||||||
20
edge/app/interfaces/hub_gateway.py
Normal file
20
edge/app/interfaces/hub_gateway.py
Normal file
@ -0,0 +1,20 @@
|
|||||||
|
from abc import ABC, abstractmethod
|
||||||
|
from app.entities.processed_agent_data import ProcessedAgentData
|
||||||
|
|
||||||
|
|
||||||
|
class HubGateway(ABC):
|
||||||
|
"""
|
||||||
|
Abstract class representing the Store Gateway interface.
|
||||||
|
All store gateway adapters must implement these methods.
|
||||||
|
"""
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def save_data(self, processed_data: ProcessedAgentData) -> bool:
|
||||||
|
"""
|
||||||
|
Method to save the processed agent data in the database.
|
||||||
|
Parameters:
|
||||||
|
processed_data (ProcessedAgentData): The processed agent data to be saved.
|
||||||
|
Returns:
|
||||||
|
bool: True if the data is successfully saved, False otherwise.
|
||||||
|
"""
|
||||||
|
pass
|
||||||
15
edge/app/usecases/data_processing.py
Normal file
15
edge/app/usecases/data_processing.py
Normal file
@ -0,0 +1,15 @@
|
|||||||
|
from app.entities.agent_data import AgentData
|
||||||
|
from app.entities.processed_agent_data import ProcessedAgentData
|
||||||
|
|
||||||
|
|
||||||
|
def process_agent_data(
|
||||||
|
agent_data: AgentData,
|
||||||
|
) -> ProcessedAgentData:
|
||||||
|
"""
|
||||||
|
Process agent data and classify the state of the road surface.
|
||||||
|
Parameters:
|
||||||
|
agent_data (AgentData): Agent data that containing accelerometer, GPS, and timestamp.
|
||||||
|
Returns:
|
||||||
|
processed_data_batch (ProcessedAgentData): Processed data containing the classified state of the road surface and agent data.
|
||||||
|
"""
|
||||||
|
# Implement it
|
||||||
24
edge/config.py
Normal file
24
edge/config.py
Normal file
@ -0,0 +1,24 @@
|
|||||||
|
import os
|
||||||
|
|
||||||
|
|
||||||
|
def try_parse_int(value: str):
|
||||||
|
try:
|
||||||
|
return int(value)
|
||||||
|
except Exception:
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
# Configuration for agent MQTT
|
||||||
|
MQTT_BROKER_HOST = os.environ.get("MQTT_BROKER_HOST") or "localhost"
|
||||||
|
MQTT_BROKER_PORT = try_parse_int(os.environ.get("MQTT_BROKER_PORT")) or 1883
|
||||||
|
MQTT_TOPIC = os.environ.get("MQTT_TOPIC") or "agent_data_topic"
|
||||||
|
|
||||||
|
# Configuration for hub MQTT
|
||||||
|
HUB_MQTT_BROKER_HOST = os.environ.get("HUB_MQTT_BROKER_HOST") or "localhost"
|
||||||
|
HUB_MQTT_BROKER_PORT = try_parse_int(os.environ.get("HUB_MQTT_BROKER_PORT")) or 1883
|
||||||
|
HUB_MQTT_TOPIC = os.environ.get("HUB_MQTT_TOPIC") or "processed_agent_data_topic"
|
||||||
|
|
||||||
|
# Configuration for the Hub
|
||||||
|
HUB_HOST = os.environ.get("HUB_HOST") or "localhost"
|
||||||
|
HUB_PORT = try_parse_int(os.environ.get("HUB_PORT")) or 12000
|
||||||
|
HUB_URL = f"http://{HUB_HOST}:{HUB_PORT}"
|
||||||
48
edge/docker/docker-compose.yaml
Normal file
48
edge/docker/docker-compose.yaml
Normal file
@ -0,0 +1,48 @@
|
|||||||
|
version: "3.9"
|
||||||
|
# name: "road_vision"
|
||||||
|
services:
|
||||||
|
mqtt:
|
||||||
|
image: eclipse-mosquitto
|
||||||
|
container_name: mqtt
|
||||||
|
volumes:
|
||||||
|
- ./mosquitto:/mosquitto
|
||||||
|
- ./mosquitto/data:/mosquitto/data
|
||||||
|
- ./mosquitto/log:/mosquitto/log
|
||||||
|
ports:
|
||||||
|
- 1883:1883
|
||||||
|
- 19001:9001
|
||||||
|
networks:
|
||||||
|
mqtt_network:
|
||||||
|
|
||||||
|
|
||||||
|
edge:
|
||||||
|
container_name: edge
|
||||||
|
build: ../
|
||||||
|
depends_on:
|
||||||
|
- mqtt
|
||||||
|
environment:
|
||||||
|
MQTT_BROKER_HOST: "mqtt"
|
||||||
|
MQTT_BROKER_PORT: 1883
|
||||||
|
MQTT_TOPIC: " "
|
||||||
|
HUB_HOST: "store"
|
||||||
|
HUB_PORT: 8000
|
||||||
|
HUB_MQTT_BROKER_HOST: "mqtt"
|
||||||
|
HUB_MQTT_BROKER_PORT: 1883
|
||||||
|
HUB_MQTT_TOPIC: "processed_data_topic"
|
||||||
|
networks:
|
||||||
|
mqtt_network:
|
||||||
|
edge_hub:
|
||||||
|
|
||||||
|
|
||||||
|
networks:
|
||||||
|
mqtt_network:
|
||||||
|
db_network:
|
||||||
|
edge_hub:
|
||||||
|
hub:
|
||||||
|
hub_store:
|
||||||
|
hub_redis:
|
||||||
|
|
||||||
|
|
||||||
|
volumes:
|
||||||
|
postgres_data:
|
||||||
|
pgadmin-data:
|
||||||
11
edge/docker/mosquitto/config/mosquitto.conf
Normal file
11
edge/docker/mosquitto/config/mosquitto.conf
Normal file
@ -0,0 +1,11 @@
|
|||||||
|
persistence true
|
||||||
|
persistence_location /mosquitto/data/
|
||||||
|
listener 1883
|
||||||
|
## Authentication ##
|
||||||
|
allow_anonymous true
|
||||||
|
# allow_anonymous false
|
||||||
|
# password_file /mosquitto/config/password.txt
|
||||||
|
## Log ##
|
||||||
|
log_dest file /mosquitto/log/mosquitto.log
|
||||||
|
log_dest stdout
|
||||||
|
# listener 1883
|
||||||
51
edge/main.py
Normal file
51
edge/main.py
Normal file
@ -0,0 +1,51 @@
|
|||||||
|
import logging
|
||||||
|
from app.adapters.agent_mqtt_adapter import AgentMQTTAdapter
|
||||||
|
from app.adapters.hub_http_adapter import HubHttpAdapter
|
||||||
|
from app.adapters.hub_mqtt_adapter import HubMqttAdapter
|
||||||
|
from config import (
|
||||||
|
MQTT_BROKER_HOST,
|
||||||
|
MQTT_BROKER_PORT,
|
||||||
|
MQTT_TOPIC,
|
||||||
|
HUB_URL,
|
||||||
|
HUB_MQTT_BROKER_HOST,
|
||||||
|
HUB_MQTT_BROKER_PORT,
|
||||||
|
HUB_MQTT_TOPIC,
|
||||||
|
)
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
# Configure logging settings
|
||||||
|
logging.basicConfig(
|
||||||
|
level=logging.INFO, # Set the log level to INFO (you can use logging.DEBUG for more detailed logs)
|
||||||
|
format="[%(asctime)s] [%(levelname)s] [%(module)s] %(message)s",
|
||||||
|
handlers=[
|
||||||
|
logging.StreamHandler(), # Output log messages to the console
|
||||||
|
logging.FileHandler("app.log"), # Save log messages to a file
|
||||||
|
],
|
||||||
|
)
|
||||||
|
# Create an instance of the StoreApiAdapter using the configuration
|
||||||
|
# hub_adapter = HubHttpAdapter(
|
||||||
|
# api_base_url=HUB_URL,
|
||||||
|
# )
|
||||||
|
hub_adapter = HubMqttAdapter(
|
||||||
|
broker=HUB_MQTT_BROKER_HOST,
|
||||||
|
port=HUB_MQTT_BROKER_PORT,
|
||||||
|
topic=HUB_MQTT_TOPIC,
|
||||||
|
)
|
||||||
|
# Create an instance of the AgentMQTTAdapter using the configuration
|
||||||
|
agent_adapter = AgentMQTTAdapter(
|
||||||
|
broker_host=MQTT_BROKER_HOST,
|
||||||
|
broker_port=MQTT_BROKER_PORT,
|
||||||
|
topic=MQTT_TOPIC,
|
||||||
|
hub_gateway=hub_adapter,
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
# Connect to the MQTT broker and start listening for messages
|
||||||
|
agent_adapter.connect()
|
||||||
|
agent_adapter.start()
|
||||||
|
# Keep the system running indefinitely (you can add other logic as needed)
|
||||||
|
while True:
|
||||||
|
pass
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
# Stop the MQTT adapter and exit gracefully if interrupted by the user
|
||||||
|
agent_adapter.stop()
|
||||||
|
logging.info("System stopped.")
|
||||||
10
edge/requirements.txt
Normal file
10
edge/requirements.txt
Normal file
@ -0,0 +1,10 @@
|
|||||||
|
annotated-types==0.6.0
|
||||||
|
certifi==2024.2.2
|
||||||
|
charset-normalizer==3.3.2
|
||||||
|
idna==3.6
|
||||||
|
paho-mqtt==1.6.1
|
||||||
|
pydantic==2.6.1
|
||||||
|
pydantic_core==2.16.2
|
||||||
|
requests==2.31.0
|
||||||
|
typing_extensions==4.9.0
|
||||||
|
urllib3==2.2.0
|
||||||
2
hub/.gitignore
vendored
Normal file
2
hub/.gitignore
vendored
Normal file
@ -0,0 +1,2 @@
|
|||||||
|
venv
|
||||||
|
__pycache__
|
||||||
11
hub/Dockerfile
Normal file
11
hub/Dockerfile
Normal file
@ -0,0 +1,11 @@
|
|||||||
|
# Use the official Python image as the base image
|
||||||
|
FROM python:3.9-slim
|
||||||
|
# Set the working directory inside the container
|
||||||
|
WORKDIR /app
|
||||||
|
# Copy the requirements.txt file and install dependencies
|
||||||
|
COPY requirements.txt .
|
||||||
|
RUN pip install --no-cache-dir -r requirements.txt
|
||||||
|
# Copy the entire application into the container
|
||||||
|
COPY . .
|
||||||
|
# Run the main.py script inside the container when it starts
|
||||||
|
CMD ["uvicorn", "main:app", "--host", "0.0.0.0"]
|
||||||
33
hub/README.md
Normal file
33
hub/README.md
Normal file
@ -0,0 +1,33 @@
|
|||||||
|
# Hub
|
||||||
|
## Instructions for Starting the Project
|
||||||
|
To start the Hub, follow these steps:
|
||||||
|
1. Clone the repository to your local machine:
|
||||||
|
```bash
|
||||||
|
git clone https://github.com/Toolf/hub.git
|
||||||
|
cd hub
|
||||||
|
```
|
||||||
|
2. Create and activate a virtual environment (optional but recommended):
|
||||||
|
```bash
|
||||||
|
python -m venv venv
|
||||||
|
source venv/bin/activate # On Windows, use: venv\Scripts\activate
|
||||||
|
```
|
||||||
|
3. Install the project dependencies:
|
||||||
|
```bash
|
||||||
|
pip install -r requirements.txt
|
||||||
|
```
|
||||||
|
4. Run the system:
|
||||||
|
```bash
|
||||||
|
python ./app/main.py
|
||||||
|
```
|
||||||
|
The system will start collecting data from the agent through MQTT and processing it.
|
||||||
|
## Running Tests
|
||||||
|
To run tests for the project, use the following command:
|
||||||
|
```bash
|
||||||
|
python -m unittest discover tests
|
||||||
|
```
|
||||||
|
## Common Commands
|
||||||
|
### 1. Saving Requirements
|
||||||
|
To save the project dependencies to the requirements.txt file:
|
||||||
|
```bash
|
||||||
|
pip freeze > requirements.txt
|
||||||
|
```
|
||||||
24
hub/app/adapters/store_api_adapter.py
Normal file
24
hub/app/adapters/store_api_adapter.py
Normal file
@ -0,0 +1,24 @@
|
|||||||
|
import json
|
||||||
|
import logging
|
||||||
|
from typing import List
|
||||||
|
|
||||||
|
import pydantic_core
|
||||||
|
import requests
|
||||||
|
|
||||||
|
from app.entities.processed_agent_data import ProcessedAgentData
|
||||||
|
from app.interfaces.store_gateway import StoreGateway
|
||||||
|
|
||||||
|
|
||||||
|
class StoreApiAdapter(StoreGateway):
|
||||||
|
def __init__(self, api_base_url):
|
||||||
|
self.api_base_url = api_base_url
|
||||||
|
|
||||||
|
def save_data(self, processed_agent_data_batch: List[ProcessedAgentData]):
|
||||||
|
"""
|
||||||
|
Save the processed road data to the Store API.
|
||||||
|
Parameters:
|
||||||
|
processed_agent_data_batch (dict): Processed road data to be saved.
|
||||||
|
Returns:
|
||||||
|
bool: True if the data is successfully saved, False otherwise.
|
||||||
|
"""
|
||||||
|
# Implement it
|
||||||
33
hub/app/entities/agent_data.py
Normal file
33
hub/app/entities/agent_data.py
Normal file
@ -0,0 +1,33 @@
|
|||||||
|
from datetime import datetime
|
||||||
|
from pydantic import BaseModel, field_validator
|
||||||
|
|
||||||
|
|
||||||
|
class AccelerometerData(BaseModel):
|
||||||
|
x: float
|
||||||
|
y: float
|
||||||
|
z: float
|
||||||
|
|
||||||
|
|
||||||
|
class GpsData(BaseModel):
|
||||||
|
latitude: float
|
||||||
|
longitude: float
|
||||||
|
|
||||||
|
|
||||||
|
class AgentData(BaseModel):
|
||||||
|
user_id: int
|
||||||
|
accelerometer: AccelerometerData
|
||||||
|
gps: GpsData
|
||||||
|
timestamp: datetime
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
@field_validator('timestamp', mode='before')
|
||||||
|
def parse_timestamp(cls, value):
|
||||||
|
# Convert the timestamp to a datetime object
|
||||||
|
if isinstance(value, datetime):
|
||||||
|
return value
|
||||||
|
try:
|
||||||
|
return datetime.fromisoformat(value)
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
raise ValueError(
|
||||||
|
"Invalid timestamp format. Expected ISO 8601 format (YYYY-MM-DDTHH:MM:SSZ)."
|
||||||
|
)
|
||||||
7
hub/app/entities/processed_agent_data.py
Normal file
7
hub/app/entities/processed_agent_data.py
Normal file
@ -0,0 +1,7 @@
|
|||||||
|
from pydantic import BaseModel
|
||||||
|
from app.entities.agent_data import AgentData
|
||||||
|
|
||||||
|
|
||||||
|
class ProcessedAgentData(BaseModel):
|
||||||
|
road_state: str
|
||||||
|
agent_data: AgentData
|
||||||
21
hub/app/interfaces/store_gateway.py
Normal file
21
hub/app/interfaces/store_gateway.py
Normal file
@ -0,0 +1,21 @@
|
|||||||
|
from abc import ABC, abstractmethod
|
||||||
|
from typing import List
|
||||||
|
from app.entities.processed_agent_data import ProcessedAgentData
|
||||||
|
|
||||||
|
|
||||||
|
class StoreGateway(ABC):
|
||||||
|
"""
|
||||||
|
Abstract class representing the Store Gateway interface.
|
||||||
|
All store gateway adapters must implement these methods.
|
||||||
|
"""
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def save_data(self, processed_agent_data_batch: List[ProcessedAgentData]) -> bool:
|
||||||
|
"""
|
||||||
|
Method to save the processed agent data in the database.
|
||||||
|
Parameters:
|
||||||
|
processed_agent_data_batch (ProcessedAgentData): The processed agent data to be saved.
|
||||||
|
Returns:
|
||||||
|
bool: True if the data is successfully saved, False otherwise.
|
||||||
|
"""
|
||||||
|
pass
|
||||||
26
hub/config.py
Normal file
26
hub/config.py
Normal file
@ -0,0 +1,26 @@
|
|||||||
|
import os
|
||||||
|
|
||||||
|
|
||||||
|
def try_parse_int(value: str):
|
||||||
|
try:
|
||||||
|
return int(value)
|
||||||
|
except Exception:
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
# Configuration for the Store API
|
||||||
|
STORE_API_HOST = os.environ.get("STORE_API_HOST") or "localhost"
|
||||||
|
STORE_API_PORT = try_parse_int(os.environ.get("STORE_API_PORT")) or 8000
|
||||||
|
STORE_API_BASE_URL = f"http://{STORE_API_HOST}:{STORE_API_PORT}"
|
||||||
|
|
||||||
|
# Configure for Redis
|
||||||
|
REDIS_HOST = os.environ.get("REDIS_HOST") or "localhost"
|
||||||
|
REDIS_PORT = try_parse_int(os.environ.get("REDIS_PORT")) or 6379
|
||||||
|
|
||||||
|
# Configure for hub logic
|
||||||
|
BATCH_SIZE = try_parse_int(os.environ.get("BATCH_SIZE")) or 20
|
||||||
|
|
||||||
|
# MQTT
|
||||||
|
MQTT_BROKER_HOST = os.environ.get("MQTT_BROKER_HOST") or "localhost"
|
||||||
|
MQTT_BROKER_PORT = try_parse_int(os.environ.get("MQTT_BROKER_PORT")) or 1883
|
||||||
|
MQTT_TOPIC = os.environ.get("MQTT_TOPIC") or "processed_agent_data_topic"
|
||||||
11
hub/docker/db/structure.sql
Normal file
11
hub/docker/db/structure.sql
Normal file
@ -0,0 +1,11 @@
|
|||||||
|
CREATE TABLE processed_agent_data (
|
||||||
|
id SERIAL PRIMARY KEY,
|
||||||
|
road_state VARCHAR(255) NOT NULL,
|
||||||
|
user_id INTEGER NOT NULL,
|
||||||
|
x FLOAT,
|
||||||
|
y FLOAT,
|
||||||
|
z FLOAT,
|
||||||
|
latitude FLOAT,
|
||||||
|
longitude FLOAT,
|
||||||
|
timestamp TIMESTAMP
|
||||||
|
);
|
||||||
111
hub/docker/docker-compose.yaml
Normal file
111
hub/docker/docker-compose.yaml
Normal file
@ -0,0 +1,111 @@
|
|||||||
|
version: "3.9"
|
||||||
|
name: "road_vision__hub"
|
||||||
|
services:
|
||||||
|
mqtt:
|
||||||
|
image: eclipse-mosquitto
|
||||||
|
container_name: mqtt
|
||||||
|
volumes:
|
||||||
|
- ./mosquitto:/mosquitto
|
||||||
|
- ./mosquitto/data:/mosquitto/data
|
||||||
|
- ./mosquitto/log:/mosquitto/log
|
||||||
|
ports:
|
||||||
|
- 1883:1883
|
||||||
|
- 9001:9001
|
||||||
|
networks:
|
||||||
|
mqtt_network:
|
||||||
|
|
||||||
|
|
||||||
|
postgres_db:
|
||||||
|
image: postgres:latest
|
||||||
|
container_name: postgres_db
|
||||||
|
restart: always
|
||||||
|
environment:
|
||||||
|
POSTGRES_USER: user
|
||||||
|
POSTGRES_PASSWORD: pass
|
||||||
|
POSTGRES_DB: test_db
|
||||||
|
volumes:
|
||||||
|
- postgres_data:/var/lib/postgresql/data
|
||||||
|
- ./db/structure.sql:/docker-entrypoint-initdb.d/structure.sql
|
||||||
|
ports:
|
||||||
|
- "5432:5432"
|
||||||
|
networks:
|
||||||
|
db_network:
|
||||||
|
|
||||||
|
|
||||||
|
pgadmin:
|
||||||
|
container_name: pgadmin4
|
||||||
|
image: dpage/pgadmin4
|
||||||
|
restart: always
|
||||||
|
environment:
|
||||||
|
PGADMIN_DEFAULT_EMAIL: admin@admin.com
|
||||||
|
PGADMIN_DEFAULT_PASSWORD: root
|
||||||
|
volumes:
|
||||||
|
- pgadmin-data:/var/lib/pgadmin
|
||||||
|
ports:
|
||||||
|
- "5050:80"
|
||||||
|
networks:
|
||||||
|
db_network:
|
||||||
|
|
||||||
|
|
||||||
|
store:
|
||||||
|
container_name: store
|
||||||
|
build: ../../store
|
||||||
|
depends_on:
|
||||||
|
- postgres_db
|
||||||
|
restart: always
|
||||||
|
environment:
|
||||||
|
POSTGRES_USER: user
|
||||||
|
POSTGRES_PASSWORD: pass
|
||||||
|
POSTGRES_DB: test_db
|
||||||
|
POSTGRES_HOST: postgres_db
|
||||||
|
POSTGRES_PORT: 5432
|
||||||
|
ports:
|
||||||
|
- "8000:8000"
|
||||||
|
networks:
|
||||||
|
db_network:
|
||||||
|
hub_store:
|
||||||
|
|
||||||
|
|
||||||
|
redis:
|
||||||
|
image: redis:latest
|
||||||
|
container_name: redis
|
||||||
|
ports:
|
||||||
|
- "6379:6379"
|
||||||
|
networks:
|
||||||
|
hub_redis:
|
||||||
|
|
||||||
|
|
||||||
|
hub:
|
||||||
|
container_name: hub
|
||||||
|
build: ../
|
||||||
|
depends_on:
|
||||||
|
- mqtt
|
||||||
|
- redis
|
||||||
|
- store
|
||||||
|
environment:
|
||||||
|
STORE_API_HOST: "store"
|
||||||
|
STORE_API_PORT: 8000
|
||||||
|
REDIS_HOST: "redis"
|
||||||
|
REDIS_PORT: 6379
|
||||||
|
MQTT_BROKER_HOST: "mqtt"
|
||||||
|
MQTT_BROKER_PORT: 1883
|
||||||
|
MQTT_TOPIC: "processed_data_topic"
|
||||||
|
BATCH_SIZE: 1
|
||||||
|
ports:
|
||||||
|
- "9000:8000"
|
||||||
|
networks:
|
||||||
|
mqtt_network:
|
||||||
|
hub_store:
|
||||||
|
hub_redis:
|
||||||
|
|
||||||
|
|
||||||
|
networks:
|
||||||
|
mqtt_network:
|
||||||
|
db_network:
|
||||||
|
hub_store:
|
||||||
|
hub_redis:
|
||||||
|
|
||||||
|
|
||||||
|
volumes:
|
||||||
|
postgres_data:
|
||||||
|
pgadmin-data:
|
||||||
11
hub/docker/mosquitto/config/mosquitto.conf
Normal file
11
hub/docker/mosquitto/config/mosquitto.conf
Normal file
@ -0,0 +1,11 @@
|
|||||||
|
persistence true
|
||||||
|
persistence_location /mosquitto/data/
|
||||||
|
listener 1883
|
||||||
|
## Authentication ##
|
||||||
|
allow_anonymous true
|
||||||
|
# allow_anonymous false
|
||||||
|
# password_file /mosquitto/config/password.txt
|
||||||
|
## Log ##
|
||||||
|
log_dest file /mosquitto/log/mosquitto.log
|
||||||
|
log_dest stdout
|
||||||
|
# listener 1883
|
||||||
96
hub/main.py
Normal file
96
hub/main.py
Normal file
@ -0,0 +1,96 @@
|
|||||||
|
import logging
|
||||||
|
from typing import List
|
||||||
|
|
||||||
|
from fastapi import FastAPI
|
||||||
|
from redis import Redis
|
||||||
|
import paho.mqtt.client as mqtt
|
||||||
|
|
||||||
|
from app.adapters.store_api_adapter import StoreApiAdapter
|
||||||
|
from app.entities.processed_agent_data import ProcessedAgentData
|
||||||
|
from config import (
|
||||||
|
STORE_API_BASE_URL,
|
||||||
|
REDIS_HOST,
|
||||||
|
REDIS_PORT,
|
||||||
|
BATCH_SIZE,
|
||||||
|
MQTT_TOPIC,
|
||||||
|
MQTT_BROKER_HOST,
|
||||||
|
MQTT_BROKER_PORT,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Configure logging settings
|
||||||
|
logging.basicConfig(
|
||||||
|
level=logging.INFO, # Set the log level to INFO (you can use logging.DEBUG for more detailed logs)
|
||||||
|
format="[%(asctime)s] [%(levelname)s] [%(module)s] %(message)s",
|
||||||
|
handlers=[
|
||||||
|
logging.StreamHandler(), # Output log messages to the console
|
||||||
|
logging.FileHandler("app.log"), # Save log messages to a file
|
||||||
|
],
|
||||||
|
)
|
||||||
|
# Create an instance of the Redis using the configuration
|
||||||
|
redis_client = Redis(host=REDIS_HOST, port=REDIS_PORT)
|
||||||
|
# Create an instance of the StoreApiAdapter using the configuration
|
||||||
|
store_adapter = StoreApiAdapter(api_base_url=STORE_API_BASE_URL)
|
||||||
|
# Create an instance of the AgentMQTTAdapter using the configuration
|
||||||
|
|
||||||
|
# FastAPI
|
||||||
|
app = FastAPI()
|
||||||
|
|
||||||
|
|
||||||
|
@app.post("/processed_agent_data/")
|
||||||
|
async def save_processed_agent_data(processed_agent_data: ProcessedAgentData):
|
||||||
|
redis_client.lpush("processed_agent_data", processed_agent_data.model_dump_json())
|
||||||
|
if redis_client.llen("processed_agent_data") >= BATCH_SIZE:
|
||||||
|
processed_agent_data_batch: List[ProcessedAgentData] = []
|
||||||
|
for _ in range(BATCH_SIZE):
|
||||||
|
processed_agent_data = ProcessedAgentData.model_validate_json(
|
||||||
|
redis_client.lpop("processed_agent_data")
|
||||||
|
)
|
||||||
|
processed_agent_data_batch.append(processed_agent_data)
|
||||||
|
print(processed_agent_data_batch)
|
||||||
|
store_adapter.save_data(processed_agent_data_batch=processed_agent_data_batch)
|
||||||
|
return {"status": "ok"}
|
||||||
|
|
||||||
|
|
||||||
|
# MQTT
|
||||||
|
client = mqtt.Client()
|
||||||
|
|
||||||
|
|
||||||
|
def on_connect(client, userdata, flags, rc):
|
||||||
|
if rc == 0:
|
||||||
|
logging.info("Connected to MQTT broker")
|
||||||
|
client.subscribe(MQTT_TOPIC)
|
||||||
|
else:
|
||||||
|
logging.info(f"Failed to connect to MQTT broker with code: {rc}")
|
||||||
|
|
||||||
|
|
||||||
|
def on_message(client, userdata, msg):
|
||||||
|
try:
|
||||||
|
payload: str = msg.payload.decode("utf-8")
|
||||||
|
# Create ProcessedAgentData instance with the received data
|
||||||
|
processed_agent_data = ProcessedAgentData.model_validate_json(
|
||||||
|
payload, strict=True
|
||||||
|
)
|
||||||
|
|
||||||
|
redis_client.lpush(
|
||||||
|
"processed_agent_data", processed_agent_data.model_dump_json()
|
||||||
|
)
|
||||||
|
processed_agent_data_batch: List[ProcessedAgentData] = []
|
||||||
|
if redis_client.llen("processed_agent_data") >= BATCH_SIZE:
|
||||||
|
for _ in range(BATCH_SIZE):
|
||||||
|
processed_agent_data = ProcessedAgentData.model_validate_json(
|
||||||
|
redis_client.lpop("processed_agent_data")
|
||||||
|
)
|
||||||
|
processed_agent_data_batch.append(processed_agent_data)
|
||||||
|
store_adapter.save_data(processed_agent_data_batch=processed_agent_data_batch)
|
||||||
|
return {"status": "ok"}
|
||||||
|
except Exception as e:
|
||||||
|
logging.info(f"Error processing MQTT message: {e}")
|
||||||
|
|
||||||
|
|
||||||
|
# Connect
|
||||||
|
client.on_connect = on_connect
|
||||||
|
client.on_message = on_message
|
||||||
|
client.connect(MQTT_BROKER_HOST, MQTT_BROKER_PORT)
|
||||||
|
|
||||||
|
# Start
|
||||||
|
client.loop_start()
|
||||||
BIN
hub/requirements.txt
Normal file
BIN
hub/requirements.txt
Normal file
Binary file not shown.
60
hub/tests/test_agent_mqtt_adapter.py
Normal file
60
hub/tests/test_agent_mqtt_adapter.py
Normal file
@ -0,0 +1,60 @@
|
|||||||
|
import unittest
|
||||||
|
from unittest.mock import Mock
|
||||||
|
import redis
|
||||||
|
from app.adapters.agent_mqtt_adapter import AgentMQTTAdapter
|
||||||
|
from app.interfaces.store_gateway import StoreGateway
|
||||||
|
from app.entities.agent_data import AccelerometerData, AgentData, GpsData
|
||||||
|
from app.usecases.data_processing import process_agent_data_batch
|
||||||
|
|
||||||
|
class TestAgentMQTTAdapter(unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
# Create a mock StoreGateway for testing
|
||||||
|
self.mock_store_gateway = Mock(spec=StoreGateway)
|
||||||
|
self.mock_redis = Mock(spec=redis.Redis)
|
||||||
|
# Create the AgentMQTTAdapter instance with the mock StoreGateway
|
||||||
|
self.agent_adapter = AgentMQTTAdapter(
|
||||||
|
broker_host="test_broker",
|
||||||
|
broker_port=1234,
|
||||||
|
topic="test_topic",
|
||||||
|
store_gateway=self.mock_store_gateway,
|
||||||
|
redis_client=self.mock_redis,
|
||||||
|
batch_size=1,
|
||||||
|
)
|
||||||
|
def test_on_message_valid_data(self):
|
||||||
|
# Test handling of valid incoming MQTT message
|
||||||
|
# (Assuming data is in the correct JSON format)
|
||||||
|
valid_json_data = '{"user_id": 1,"accelerometer": {"x": 0.1, "y": 0.2, "z": 0.3}, "gps": {"latitude": 10.123, "longitude": 20.456}, "timestamp": "2023-07-21T12:34:56Z"}'
|
||||||
|
mock_msg = Mock(payload=valid_json_data.encode("utf-8"))
|
||||||
|
self.mock_redis.llen.return_value = 1
|
||||||
|
self.mock_redis.rpop.return_value = valid_json_data
|
||||||
|
# Call on_message with the mock message
|
||||||
|
self.agent_adapter.on_message(None, None, mock_msg)
|
||||||
|
# Ensure that the store_gateway's save_data method is called once with the correct arguments
|
||||||
|
expected_agent_data = AgentData(
|
||||||
|
user_id=1,
|
||||||
|
accelerometer=AccelerometerData(
|
||||||
|
x=0.1,
|
||||||
|
y=0.2,
|
||||||
|
z=0.3,
|
||||||
|
),
|
||||||
|
gps=GpsData(
|
||||||
|
latitude=10.123,
|
||||||
|
longitude=20.456,
|
||||||
|
),
|
||||||
|
timestamp="2023-07-21T12:34:56Z",
|
||||||
|
)
|
||||||
|
self.mock_store_gateway.save_data.assert_called_once_with(
|
||||||
|
process_agent_data_batch([expected_agent_data])
|
||||||
|
)
|
||||||
|
def test_on_message_invalid_data(self):
|
||||||
|
# Test handling of invalid incoming MQTT message
|
||||||
|
# (Assuming data is missing required fields or has incorrect format)
|
||||||
|
invalid_json_data = '{"user_id": 1, "accelerometer": {"x": 0.1, "y": 0.2}, "gps": {"latitude": 10.123}, "timestamp": 12345}'
|
||||||
|
mock_msg = Mock(payload=invalid_json_data.encode("utf-8"))
|
||||||
|
# Call on_message with the mock message
|
||||||
|
self.agent_adapter.on_message(None, None, mock_msg)
|
||||||
|
# Ensure that the store_gateway's save_data method is not called (due to invalid data)
|
||||||
|
self.mock_store_gateway.save_data.assert_not_called()
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
||||||
72
hub/tests/test_store_api_adapter.py
Normal file
72
hub/tests/test_store_api_adapter.py
Normal file
@ -0,0 +1,72 @@
|
|||||||
|
import requests
|
||||||
|
import unittest
|
||||||
|
from unittest.mock import Mock, patch
|
||||||
|
from app.adapters.store_api_adapter import StoreApiAdapter
|
||||||
|
from app.entities.agent_data import AccelerometerData, AgentData, GpsData
|
||||||
|
from app.entities.processed_agent_data import ProcessedAgentData
|
||||||
|
|
||||||
|
class TestStoreApiAdapter(unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
# Create the StoreApiAdapter instance
|
||||||
|
self.store_api_adapter = StoreApiAdapter(api_base_url="http://test-api.com")
|
||||||
|
@patch.object(requests, "post")
|
||||||
|
def test_save_data_success(self, mock_post):
|
||||||
|
# Test successful saving of data to the Store API
|
||||||
|
# Sample processed road data
|
||||||
|
agent_data = AgentData(
|
||||||
|
user_id=1,
|
||||||
|
accelerometer=AccelerometerData(
|
||||||
|
x=0.1,
|
||||||
|
y=0.2,
|
||||||
|
z=0.3,
|
||||||
|
),
|
||||||
|
gps=GpsData(
|
||||||
|
latitude=10.123,
|
||||||
|
longitude=20.456,
|
||||||
|
),
|
||||||
|
timestamp="2023-07-21T12:34:56Z",
|
||||||
|
)
|
||||||
|
processed_data = ProcessedAgentData(road_state="normal", agent_data=agent_data)
|
||||||
|
# Mock the response from the Store API
|
||||||
|
mock_response = Mock(status_code=201) # 201 indicates successful creation
|
||||||
|
mock_post.return_value = mock_response
|
||||||
|
# Call the save_data method
|
||||||
|
result = self.store_api_adapter.save_data(processed_data)
|
||||||
|
# Ensure that the post method of the mock is called with the correct arguments
|
||||||
|
mock_post.assert_called_once_with(
|
||||||
|
"http://test-api.com/agent_data", json=processed_data.model_dump()
|
||||||
|
)
|
||||||
|
# Ensure that the result is True, indicating successful saving
|
||||||
|
self.assertTrue(result)
|
||||||
|
@patch.object(requests, "post")
|
||||||
|
def test_save_data_failure(self, mock_post):
|
||||||
|
# Test failure to save data to the Store API
|
||||||
|
# Sample processed road data
|
||||||
|
agent_data = AgentData(
|
||||||
|
user_id=1,
|
||||||
|
accelerometer=AccelerometerData(
|
||||||
|
x=0.1,
|
||||||
|
y=0.2,
|
||||||
|
z=0.3,
|
||||||
|
),
|
||||||
|
gps=GpsData(
|
||||||
|
latitude=10.123,
|
||||||
|
longitude=20.456,
|
||||||
|
),
|
||||||
|
timestamp="2023-07-21T12:34:56Z",
|
||||||
|
)
|
||||||
|
processed_data = ProcessedAgentData(road_state="normal", agent_data=agent_data)
|
||||||
|
# Mock the response from the Store API
|
||||||
|
mock_response = Mock(status_code=400) # 400 indicates a client error
|
||||||
|
mock_post.return_value = mock_response
|
||||||
|
# Call the save_data method
|
||||||
|
result = self.store_api_adapter.save_data(processed_data)
|
||||||
|
# Ensure that the post method of the mock is called with the correct arguments
|
||||||
|
mock_post.assert_called_once_with(
|
||||||
|
"http://test-api.com/agent_data", json=processed_data.model_dump()
|
||||||
|
)
|
||||||
|
# Ensure that the result is False, indicating failure to save
|
||||||
|
self.assertFalse(result)
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
||||||
Loading…
x
Reference in New Issue
Block a user