Compare commits
No commits in common. "b2c7427af0edb37feaaed8b35de12ef1488c0347" and "abd6bf0abeb0a9e4915e93f56c0ffa9ce09988e8" have entirely different histories.
b2c7427af0
...
abd6bf0abe
3
MapView/.gitignore
vendored
3
MapView/.gitignore
vendored
@ -1,3 +0,0 @@
|
|||||||
.idea
|
|
||||||
venv
|
|
||||||
__pycache__
|
|
||||||
@ -1,102 +0,0 @@
|
|||||||
## Лабораторна робота №5
|
|
||||||
### Тема
|
|
||||||
|
|
||||||
Візуалізація якості стану дорожнього покриття за допомогою фреймворку Kivy.
|
|
||||||
|
|
||||||
### Мета
|
|
||||||
|
|
||||||
Розробити програму для візуалізації руху машини на дорозі та якості дороги за допомогою даних датчиків.
|
|
||||||
|
|
||||||
### Підготовка робочого середовище, встановлення проекту
|
|
||||||
|
|
||||||
Створення віртуального середовища
|
|
||||||
|
|
||||||
`python -m venv ./venv`
|
|
||||||
|
|
||||||
Активація середовища для linux
|
|
||||||
|
|
||||||
`source ./venv/bin/activate`
|
|
||||||
|
|
||||||
Активація середовища для windows
|
|
||||||
|
|
||||||
`venv\Scripts\activate`
|
|
||||||
|
|
||||||
Встановлення необхідних бібліотек
|
|
||||||
|
|
||||||
`pip install -r requirements.txt`
|
|
||||||
|
|
||||||
### Завдання
|
|
||||||
|
|
||||||
Для відображення мапи використовується віджет [Mapview](https://mapview.readthedocs.io/en/1.0.4/) для Kivy.
|
|
||||||
|
|
||||||
Для візуалізації руху машини можна використовувати MapMarker та рухати його відповідно GPS даних.
|
|
||||||
Для позначення нерівностей на дорозі також використовувати маркери. Зображення для маркерів можна знайти в папці images.
|
|
||||||
|
|
||||||
Для створення та редагування маршруту машини на мапі використовуйте клас LineMapLayer та функцію
|
|
||||||
`add_point()` з файлу lineMapLayer.py. Додавання лінії на мапу:
|
|
||||||
|
|
||||||
```python
|
|
||||||
map_layer = LineMapLayer()
|
|
||||||
mapview.add_layer(map_layer, mode="scatter")
|
|
||||||
```
|
|
||||||
|
|
||||||
Щоб створити затримку при відображенні руху машини можна використовувати
|
|
||||||
функцію `kivy.clock.Clock.schedule_once()` або `kivy.clock.Clock.schedule_interval()`.
|
|
||||||
|
|
||||||
Дані для відображення на мапі (координати та стан дороги) зчитуються з бази даних через вебсокет.
|
|
||||||
Для їх отримання використовуйте функцію `get_new_points()` з datasource.py.
|
|
||||||
|
|
||||||
**Шаблон основного файлу проєкту**
|
|
||||||
|
|
||||||
```python
|
|
||||||
from kivy.app import App
|
|
||||||
from kivy_garden.mapview import MapMarker, MapView
|
|
||||||
from kivy.clock import Clock
|
|
||||||
from lineMapLayer import LineMapLayer
|
|
||||||
|
|
||||||
|
|
||||||
class MapViewApp(App):
|
|
||||||
def __init__(self, **kwargs):
|
|
||||||
super().__init__()
|
|
||||||
# додати необхідні змінні
|
|
||||||
|
|
||||||
def on_start(self):
|
|
||||||
"""
|
|
||||||
Встановлює необхідні маркери, викликає функцію для оновлення мапи
|
|
||||||
"""
|
|
||||||
|
|
||||||
def update(self, *args):
|
|
||||||
"""
|
|
||||||
Викликається регулярно для оновлення мапи
|
|
||||||
"""
|
|
||||||
|
|
||||||
def update_car_marker(self, point):
|
|
||||||
"""
|
|
||||||
Оновлює відображення маркера машини на мапі
|
|
||||||
:param point: GPS координати
|
|
||||||
"""
|
|
||||||
|
|
||||||
def set_pothole_marker(self, point):
|
|
||||||
"""
|
|
||||||
Встановлює маркер для ями
|
|
||||||
:param point: GPS координати
|
|
||||||
"""
|
|
||||||
|
|
||||||
def set_bump_marker(self, point):
|
|
||||||
"""
|
|
||||||
Встановлює маркер для лежачого поліцейського
|
|
||||||
:param point: GPS координати
|
|
||||||
"""
|
|
||||||
|
|
||||||
def build(self):
|
|
||||||
"""
|
|
||||||
Ініціалізує мапу MapView(zoom, lat, lon)
|
|
||||||
:return: мапу
|
|
||||||
"""
|
|
||||||
self.mapview = MapView()
|
|
||||||
return self.mapview
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
MapViewApp().run()
|
|
||||||
```
|
|
||||||
@ -1,4 +0,0 @@
|
|||||||
import os
|
|
||||||
|
|
||||||
STORE_HOST = os.environ.get("STORE_HOST") or "localhost"
|
|
||||||
STORE_PORT = os.environ.get("STORE_PORT") or 8000
|
|
||||||
File diff suppressed because one or more lines are too long
@ -1,81 +0,0 @@
|
|||||||
import asyncio
|
|
||||||
import json
|
|
||||||
from datetime import datetime
|
|
||||||
import websockets
|
|
||||||
from kivy import Logger
|
|
||||||
from pydantic import BaseModel, field_validator
|
|
||||||
from config import STORE_HOST, STORE_PORT
|
|
||||||
|
|
||||||
|
|
||||||
# Pydantic models
|
|
||||||
class ProcessedAgentData(BaseModel):
|
|
||||||
road_state: str
|
|
||||||
user_id: int
|
|
||||||
x: float
|
|
||||||
y: float
|
|
||||||
z: float
|
|
||||||
latitude: float
|
|
||||||
longitude: float
|
|
||||||
timestamp: datetime
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
@field_validator("timestamp", mode="before")
|
|
||||||
def check_timestamp(cls, value):
|
|
||||||
if isinstance(value, datetime):
|
|
||||||
return value
|
|
||||||
try:
|
|
||||||
return datetime.fromisoformat(value)
|
|
||||||
except (TypeError, ValueError):
|
|
||||||
raise ValueError(
|
|
||||||
"Invalid timestamp format. Expected ISO 8601 format (YYYY-MM-DDTHH:MM:SSZ)."
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class Datasource:
|
|
||||||
def __init__(self, user_id: int):
|
|
||||||
self.index = 0
|
|
||||||
self.user_id = user_id
|
|
||||||
self.connection_status = None
|
|
||||||
self._new_points = []
|
|
||||||
asyncio.ensure_future(self.connect_to_server())
|
|
||||||
|
|
||||||
def get_new_points(self):
|
|
||||||
Logger.debug(self._new_points)
|
|
||||||
points = self._new_points
|
|
||||||
self._new_points = []
|
|
||||||
return points
|
|
||||||
|
|
||||||
async def connect_to_server(self):
|
|
||||||
uri = f"ws://{STORE_HOST}:{STORE_PORT}/ws/{self.user_id}"
|
|
||||||
while True:
|
|
||||||
Logger.debug("CONNECT TO SERVER")
|
|
||||||
async with websockets.connect(uri) as websocket:
|
|
||||||
self.connection_status = "Connected"
|
|
||||||
try:
|
|
||||||
while True:
|
|
||||||
data = await websocket.recv()
|
|
||||||
parsed_data = json.loads(data)
|
|
||||||
self.handle_received_data(parsed_data)
|
|
||||||
except websockets.ConnectionClosedOK:
|
|
||||||
self.connection_status = "Disconnected"
|
|
||||||
Logger.debug("SERVER DISCONNECT")
|
|
||||||
|
|
||||||
def handle_received_data(self, data):
|
|
||||||
# Update your UI or perform actions with received data here
|
|
||||||
Logger.debug(f"Received data: {data}")
|
|
||||||
processed_agent_data_list = sorted(
|
|
||||||
[
|
|
||||||
ProcessedAgentData(**processed_data_json)
|
|
||||||
for processed_data_json in json.loads(data)
|
|
||||||
],
|
|
||||||
key=lambda v: v.timestamp,
|
|
||||||
)
|
|
||||||
new_points = [
|
|
||||||
(
|
|
||||||
processed_agent_data.latitude,
|
|
||||||
processed_agent_data.longitude,
|
|
||||||
processed_agent_data.road_state,
|
|
||||||
)
|
|
||||||
for processed_agent_data in processed_agent_data_list
|
|
||||||
]
|
|
||||||
self._new_points.extend(new_points)
|
|
||||||
Binary file not shown.
|
Before Width: | Height: | Size: 2.4 KiB |
Binary file not shown.
|
Before Width: | Height: | Size: 4.0 KiB |
Binary file not shown.
|
Before Width: | Height: | Size: 4.3 KiB |
@ -1,147 +0,0 @@
|
|||||||
from kivy_garden.mapview import MapLayer, MapMarker
|
|
||||||
from kivy.graphics import Color, Line
|
|
||||||
from kivy.graphics.context_instructions import Translate, Scale, PushMatrix, PopMatrix
|
|
||||||
from kivy_garden.mapview.utils import clamp
|
|
||||||
from kivy_garden.mapview.constants import (MIN_LONGITUDE, MAX_LONGITUDE, MIN_LATITUDE, MAX_LATITUDE)
|
|
||||||
from math import radians, log, tan, cos, pi
|
|
||||||
|
|
||||||
|
|
||||||
class LineMapLayer(MapLayer):
|
|
||||||
def __init__(self, coordinates=None, color=[0, 0, 1, 1], width=2, **kwargs):
|
|
||||||
super().__init__(**kwargs)
|
|
||||||
# if coordinates is None:
|
|
||||||
# coordinates = [[0, 0], [0, 0]]
|
|
||||||
self._coordinates = coordinates
|
|
||||||
self.color = color
|
|
||||||
self._line_points = None
|
|
||||||
self._line_points_offset = (0, 0)
|
|
||||||
self.zoom = 0
|
|
||||||
self.lon = 0
|
|
||||||
self.lat = 0
|
|
||||||
self.ms = 0
|
|
||||||
self._width = width
|
|
||||||
|
|
||||||
@property
|
|
||||||
def coordinates(self):
|
|
||||||
return self._coordinates
|
|
||||||
|
|
||||||
@coordinates.setter
|
|
||||||
def coordinates(self, coordinates):
|
|
||||||
self._coordinates = coordinates
|
|
||||||
self.invalidate_line_points()
|
|
||||||
self.clear_and_redraw()
|
|
||||||
|
|
||||||
def add_point(self, point):
|
|
||||||
if self._coordinates is None:
|
|
||||||
#self._coordinates = [point]
|
|
||||||
self._coordinates = []
|
|
||||||
self._coordinates.append(point)
|
|
||||||
# self._coordinates = [self._coordinates[-1], point]
|
|
||||||
self.invalidate_line_points()
|
|
||||||
self.clear_and_redraw()
|
|
||||||
|
|
||||||
@property
|
|
||||||
def line_points(self):
|
|
||||||
if self._line_points is None:
|
|
||||||
self.calc_line_points()
|
|
||||||
return self._line_points
|
|
||||||
|
|
||||||
@property
|
|
||||||
def line_points_offset(self):
|
|
||||||
if self._line_points is None:
|
|
||||||
self.calc_line_points()
|
|
||||||
return self._line_points_offset
|
|
||||||
|
|
||||||
def calc_line_points(self):
|
|
||||||
# Offset all points by the coordinates of the first point,
|
|
||||||
# to keep coordinates closer to zero.
|
|
||||||
# (and therefore avoid some float precision issues when drawing lines)
|
|
||||||
self._line_points_offset = (self.get_x(self.coordinates[0][1]),
|
|
||||||
self.get_y(self.coordinates[0][0]))
|
|
||||||
# Since lat is not a linear transform we must compute manually
|
|
||||||
self._line_points = [(self.get_x(lon) - self._line_points_offset[0],
|
|
||||||
self.get_y(lat) - self._line_points_offset[1])
|
|
||||||
for lat, lon in self.coordinates]
|
|
||||||
|
|
||||||
def invalidate_line_points(self):
|
|
||||||
self._line_points = None
|
|
||||||
self._line_points_offset = (0, 0)
|
|
||||||
|
|
||||||
def get_x(self, lon):
|
|
||||||
"""Get the x position on the map using this map source's projection
|
|
||||||
(0, 0) is located at the top left.
|
|
||||||
"""
|
|
||||||
return clamp(lon, MIN_LONGITUDE, MAX_LONGITUDE) * self.ms / 360.0
|
|
||||||
|
|
||||||
def get_y(self, lat):
|
|
||||||
"""Get the y position on the map using this map source's projection
|
|
||||||
(0, 0) is located at the top left.
|
|
||||||
"""
|
|
||||||
lat = radians(clamp(-lat, MIN_LATITUDE, MAX_LATITUDE))
|
|
||||||
return (1.0 - log(tan(lat) + 1.0 / cos(lat)) / pi) * self.ms / 2.0
|
|
||||||
|
|
||||||
# Function called when the MapView is moved
|
|
||||||
def reposition(self):
|
|
||||||
map_view = self.parent
|
|
||||||
|
|
||||||
# Must redraw when the zoom changes
|
|
||||||
# as the scatter transform resets for the new tiles
|
|
||||||
if self.zoom != map_view.zoom or \
|
|
||||||
self.lon != round(map_view.lon, 7) or \
|
|
||||||
self.lat != round(map_view.lat, 7):
|
|
||||||
map_source = map_view.map_source
|
|
||||||
self.ms = pow(2.0, map_view.zoom) * map_source.dp_tile_size
|
|
||||||
self.invalidate_line_points()
|
|
||||||
self.clear_and_redraw()
|
|
||||||
|
|
||||||
def clear_and_redraw(self, *args):
|
|
||||||
with self.canvas:
|
|
||||||
# Clear old line
|
|
||||||
self.canvas.clear()
|
|
||||||
|
|
||||||
self._draw_line()
|
|
||||||
|
|
||||||
def _draw_line(self, *args):
|
|
||||||
if self._coordinates is None:
|
|
||||||
return
|
|
||||||
map_view = self.parent
|
|
||||||
self.zoom = map_view.zoom
|
|
||||||
self.lon = map_view.lon
|
|
||||||
self.lat = map_view.lat
|
|
||||||
|
|
||||||
# When zooming we must undo the current scatter transform
|
|
||||||
# or the animation distorts it
|
|
||||||
scatter = map_view._scatter
|
|
||||||
sx, sy, ss = scatter.x, scatter.y, scatter.scale
|
|
||||||
|
|
||||||
# Account for map source tile size and map view zoom
|
|
||||||
vx, vy, vs = map_view.viewport_pos[0], map_view.viewport_pos[1], map_view.scale
|
|
||||||
|
|
||||||
with self.canvas:
|
|
||||||
self.opacity = 0.5
|
|
||||||
# Save the current coordinate space context
|
|
||||||
PushMatrix()
|
|
||||||
|
|
||||||
# Offset by the MapView's position in the window (always 0,0 ?)
|
|
||||||
Translate(*map_view.pos)
|
|
||||||
|
|
||||||
# Undo the scatter animation transform
|
|
||||||
Scale(1 / ss, 1 / ss, 1)
|
|
||||||
Translate(-sx, -sy)
|
|
||||||
|
|
||||||
# Apply the get window xy from transforms
|
|
||||||
Scale(vs, vs, 1)
|
|
||||||
Translate(-vx, -vy)
|
|
||||||
|
|
||||||
# Apply what we can factor out of the mapsource long, lat to x, y conversion
|
|
||||||
Translate(self.ms / 2, 0)
|
|
||||||
|
|
||||||
# Translate by the offset of the line points
|
|
||||||
# (this keeps the points closer to the origin)
|
|
||||||
Translate(*self.line_points_offset)
|
|
||||||
|
|
||||||
Color(*self.color)
|
|
||||||
Line(points=self.line_points, width=self._width)
|
|
||||||
|
|
||||||
# Retrieve the last saved coordinate space context
|
|
||||||
PopMatrix()
|
|
||||||
@ -1,55 +0,0 @@
|
|||||||
import asyncio
|
|
||||||
from kivy.app import App
|
|
||||||
from kivy_garden.mapview import MapMarker, MapView
|
|
||||||
from kivy.clock import Clock
|
|
||||||
from lineMapLayer import LineMapLayer
|
|
||||||
from datasource import Datasource
|
|
||||||
|
|
||||||
|
|
||||||
class MapViewApp(App):
|
|
||||||
def __init__(self, **kwargs):
|
|
||||||
super().__init__()
|
|
||||||
# додати необхідні змінні
|
|
||||||
|
|
||||||
def on_start(self):
|
|
||||||
"""
|
|
||||||
Встановлює необхідні маркери, викликає функцію для оновлення мапи
|
|
||||||
"""
|
|
||||||
|
|
||||||
def update(self, *args):
|
|
||||||
"""
|
|
||||||
Викликається регулярно для оновлення мапи
|
|
||||||
"""
|
|
||||||
|
|
||||||
|
|
||||||
def update_car_marker(self, point):
|
|
||||||
"""
|
|
||||||
Оновлює відображення маркера машини на мапі
|
|
||||||
:param point: GPS координати
|
|
||||||
"""
|
|
||||||
|
|
||||||
def set_pothole_marker(self, point):
|
|
||||||
"""
|
|
||||||
Встановлює маркер для ями
|
|
||||||
:param point: GPS координати
|
|
||||||
"""
|
|
||||||
|
|
||||||
def set_bump_marker(self, point):
|
|
||||||
"""
|
|
||||||
Встановлює маркер для лежачого поліцейського
|
|
||||||
:param point: GPS координати
|
|
||||||
"""
|
|
||||||
|
|
||||||
def build(self):
|
|
||||||
"""
|
|
||||||
Ініціалізує мапу MapView(zoom, lat, lon)
|
|
||||||
:return: мапу
|
|
||||||
"""
|
|
||||||
self.mapview = MapView()
|
|
||||||
return self.mapview
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
loop = asyncio.get_event_loop()
|
|
||||||
loop.run_until_complete(MapViewApp().async_run(async_lib="asyncio"))
|
|
||||||
loop.close()
|
|
||||||
@ -1,15 +0,0 @@
|
|||||||
annotated-types==0.6.0
|
|
||||||
certifi==2024.2.2
|
|
||||||
charset-normalizer==3.3.2
|
|
||||||
docutils==0.20.1
|
|
||||||
idna==3.6
|
|
||||||
Kivy==2.3.0
|
|
||||||
Kivy-Garden==0.1.5
|
|
||||||
kivy-garden.mapview==1.0.6
|
|
||||||
pydantic==2.6.2
|
|
||||||
pydantic_core==2.16.3
|
|
||||||
Pygments==2.17.2
|
|
||||||
requests==2.31.0
|
|
||||||
typing_extensions==4.10.0
|
|
||||||
urllib3==2.2.1
|
|
||||||
websockets==12.0
|
|
||||||
@ -1,3 +1,4 @@
|
|||||||
|
version: "3.9"
|
||||||
name: "road_vision"
|
name: "road_vision"
|
||||||
services:
|
services:
|
||||||
mqtt:
|
mqtt:
|
||||||
|
|||||||
@ -8,7 +8,6 @@ def try_parse(type, value: str):
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
USER_ID = 1
|
|
||||||
# MQTT config
|
# MQTT config
|
||||||
MQTT_BROKER_HOST = os.environ.get("MQTT_BROKER_HOST") or "mqtt"
|
MQTT_BROKER_HOST = os.environ.get("MQTT_BROKER_HOST") or "mqtt"
|
||||||
MQTT_BROKER_PORT = try_parse(int, os.environ.get("MQTT_BROKER_PORT")) or 1883
|
MQTT_BROKER_PORT = try_parse(int, os.environ.get("MQTT_BROKER_PORT")) or 1883
|
||||||
|
|||||||
@ -9,5 +9,4 @@ from domain.gps import Gps
|
|||||||
class AggregatedData:
|
class AggregatedData:
|
||||||
accelerometer: Accelerometer
|
accelerometer: Accelerometer
|
||||||
gps: Gps
|
gps: Gps
|
||||||
timestamp: datetime
|
time: datetime
|
||||||
user_id: int
|
|
||||||
|
|||||||
@ -1,9 +1,6 @@
|
|||||||
from csv import reader
|
from csv import reader
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from domain.accelerometer import Accelerometer
|
|
||||||
from domain.gps import Gps
|
|
||||||
from domain.aggregated_data import AggregatedData
|
from domain.aggregated_data import AggregatedData
|
||||||
import config
|
|
||||||
|
|
||||||
|
|
||||||
class FileDatasource:
|
class FileDatasource:
|
||||||
@ -16,12 +13,6 @@ class FileDatasource:
|
|||||||
|
|
||||||
def read(self) -> AggregatedData:
|
def read(self) -> AggregatedData:
|
||||||
"""Метод повертає дані отримані з датчиків"""
|
"""Метод повертає дані отримані з датчиків"""
|
||||||
return AggregatedData(
|
|
||||||
Accelerometer(1, 2, 3),
|
|
||||||
Gps(4, 5),
|
|
||||||
datetime.now(),
|
|
||||||
config.USER_ID,
|
|
||||||
)
|
|
||||||
|
|
||||||
def startReading(self, *args, **kwargs):
|
def startReading(self, *args, **kwargs):
|
||||||
"""Метод повинен викликатись перед початком читання даних"""
|
"""Метод повинен викликатись перед початком читання даних"""
|
||||||
|
|||||||
@ -6,5 +6,4 @@ from schema.gps_schema import GpsSchema
|
|||||||
class AggregatedDataSchema(Schema):
|
class AggregatedDataSchema(Schema):
|
||||||
accelerometer = fields.Nested(AccelerometerSchema)
|
accelerometer = fields.Nested(AccelerometerSchema)
|
||||||
gps = fields.Nested(GpsSchema)
|
gps = fields.Nested(GpsSchema)
|
||||||
timestamp = fields.DateTime("iso")
|
time = fields.DateTime("iso")
|
||||||
user_id = fields.Int()
|
|
||||||
|
|||||||
2
edge/.gitignore
vendored
2
edge/.gitignore
vendored
@ -1,2 +0,0 @@
|
|||||||
venv
|
|
||||||
app.log
|
|
||||||
@ -1,11 +0,0 @@
|
|||||||
# Use the official Python image as the base image
|
|
||||||
FROM python:3.9-slim
|
|
||||||
# Set the working directory inside the container
|
|
||||||
WORKDIR /app
|
|
||||||
# Copy the requirements.txt file and install dependencies
|
|
||||||
COPY requirements.txt .
|
|
||||||
RUN pip install --no-cache-dir -r requirements.txt
|
|
||||||
# Copy the entire application into the container
|
|
||||||
COPY . .
|
|
||||||
# Run the main.py script inside the container when it starts
|
|
||||||
CMD ["python", "main.py"]
|
|
||||||
@ -1,76 +0,0 @@
|
|||||||
import logging
|
|
||||||
import paho.mqtt.client as mqtt
|
|
||||||
from app.interfaces.agent_gateway import AgentGateway
|
|
||||||
from app.entities.agent_data import AgentData, GpsData
|
|
||||||
from app.usecases.data_processing import process_agent_data
|
|
||||||
from app.interfaces.hub_gateway import HubGateway
|
|
||||||
|
|
||||||
|
|
||||||
class AgentMQTTAdapter(AgentGateway):
|
|
||||||
def __init__(
|
|
||||||
self,
|
|
||||||
broker_host,
|
|
||||||
broker_port,
|
|
||||||
topic,
|
|
||||||
hub_gateway: HubGateway,
|
|
||||||
batch_size=10,
|
|
||||||
):
|
|
||||||
self.batch_size = batch_size
|
|
||||||
# MQTT
|
|
||||||
self.broker_host = broker_host
|
|
||||||
self.broker_port = broker_port
|
|
||||||
self.topic = topic
|
|
||||||
self.client = mqtt.Client()
|
|
||||||
# Hub
|
|
||||||
self.hub_gateway = hub_gateway
|
|
||||||
|
|
||||||
def on_connect(self, client, userdata, flags, rc):
|
|
||||||
if rc == 0:
|
|
||||||
logging.info("Connected to MQTT broker")
|
|
||||||
self.client.subscribe(self.topic)
|
|
||||||
else:
|
|
||||||
logging.info(f"Failed to connect to MQTT broker with code: {rc}")
|
|
||||||
|
|
||||||
def on_message(self, client, userdata, msg):
|
|
||||||
"""Processing agent data and sent it to hub gateway"""
|
|
||||||
try:
|
|
||||||
payload: str = msg.payload.decode("utf-8")
|
|
||||||
# Create AgentData instance with the received data
|
|
||||||
agent_data = AgentData.model_validate_json(payload, strict=True)
|
|
||||||
# Process the received data (you can call a use case here if needed)
|
|
||||||
processed_data = process_agent_data(agent_data)
|
|
||||||
# Store the agent_data in the database (you can send it to the data processing module)
|
|
||||||
if not self.hub_gateway.save_data(processed_data):
|
|
||||||
logging.error("Hub is not available")
|
|
||||||
except Exception as e:
|
|
||||||
logging.info(f"Error processing MQTT message: {e}")
|
|
||||||
|
|
||||||
def connect(self):
|
|
||||||
self.client.on_connect = self.on_connect
|
|
||||||
self.client.on_message = self.on_message
|
|
||||||
self.client.connect(self.broker_host, self.broker_port, 60)
|
|
||||||
|
|
||||||
def start(self):
|
|
||||||
self.client.loop_start()
|
|
||||||
|
|
||||||
def stop(self):
|
|
||||||
self.client.loop_stop()
|
|
||||||
|
|
||||||
|
|
||||||
# Usage example:
|
|
||||||
if __name__ == "__main__":
|
|
||||||
broker_host = "localhost"
|
|
||||||
broker_port = 1883
|
|
||||||
topic = "agent_data_topic"
|
|
||||||
# Assuming you have implemented the StoreGateway and passed it to the adapter
|
|
||||||
store_gateway = HubGateway()
|
|
||||||
adapter = AgentMQTTAdapter(broker_host, broker_port, topic, store_gateway)
|
|
||||||
adapter.connect()
|
|
||||||
adapter.start()
|
|
||||||
try:
|
|
||||||
# Keep the adapter running in the background
|
|
||||||
while True:
|
|
||||||
pass
|
|
||||||
except KeyboardInterrupt:
|
|
||||||
adapter.stop()
|
|
||||||
logging.info("Adapter stopped.")
|
|
||||||
@ -1,29 +0,0 @@
|
|||||||
import logging
|
|
||||||
|
|
||||||
import requests as requests
|
|
||||||
|
|
||||||
from app.entities.processed_agent_data import ProcessedAgentData
|
|
||||||
from app.interfaces.hub_gateway import HubGateway
|
|
||||||
|
|
||||||
|
|
||||||
class HubHttpAdapter(HubGateway):
|
|
||||||
def __init__(self, api_base_url):
|
|
||||||
self.api_base_url = api_base_url
|
|
||||||
|
|
||||||
def save_data(self, processed_data: ProcessedAgentData):
|
|
||||||
"""
|
|
||||||
Save the processed road data to the Hub.
|
|
||||||
Parameters:
|
|
||||||
processed_data (ProcessedAgentData): Processed road data to be saved.
|
|
||||||
Returns:
|
|
||||||
bool: True if the data is successfully saved, False otherwise.
|
|
||||||
"""
|
|
||||||
url = f"{self.api_base_url}/processed_agent_data/"
|
|
||||||
|
|
||||||
response = requests.post(url, data=processed_data.model_dump_json())
|
|
||||||
if response.status_code != 200:
|
|
||||||
logging.info(
|
|
||||||
f"Invalid Hub response\nData: {processed_data.model_dump_json()}\nResponse: {response}"
|
|
||||||
)
|
|
||||||
return False
|
|
||||||
return True
|
|
||||||
@ -1,50 +0,0 @@
|
|||||||
import logging
|
|
||||||
|
|
||||||
import requests as requests
|
|
||||||
from paho.mqtt import client as mqtt_client
|
|
||||||
|
|
||||||
from app.entities.processed_agent_data import ProcessedAgentData
|
|
||||||
from app.interfaces.hub_gateway import HubGateway
|
|
||||||
|
|
||||||
|
|
||||||
class HubMqttAdapter(HubGateway):
|
|
||||||
def __init__(self, broker, port, topic):
|
|
||||||
self.broker = broker
|
|
||||||
self.port = port
|
|
||||||
self.topic = topic
|
|
||||||
self.mqtt_client = self._connect_mqtt(broker, port)
|
|
||||||
|
|
||||||
def save_data(self, processed_data: ProcessedAgentData):
|
|
||||||
"""
|
|
||||||
Save the processed road data to the Hub.
|
|
||||||
Parameters:
|
|
||||||
processed_data (ProcessedAgentData): Processed road data to be saved.
|
|
||||||
Returns:
|
|
||||||
bool: True if the data is successfully saved, False otherwise.
|
|
||||||
"""
|
|
||||||
msg = processed_data.model_dump_json()
|
|
||||||
result = self.mqtt_client.publish(self.topic, msg)
|
|
||||||
status = result[0]
|
|
||||||
if status == 0:
|
|
||||||
return True
|
|
||||||
else:
|
|
||||||
print(f"Failed to send message to topic {self.topic}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _connect_mqtt(broker, port):
|
|
||||||
"""Create MQTT client"""
|
|
||||||
print(f"CONNECT TO {broker}:{port}")
|
|
||||||
|
|
||||||
def on_connect(client, userdata, flags, rc):
|
|
||||||
if rc == 0:
|
|
||||||
print(f"Connected to MQTT Broker ({broker}:{port})!")
|
|
||||||
else:
|
|
||||||
print("Failed to connect {broker}:{port}, return code %d\n", rc)
|
|
||||||
exit(rc) # Stop execution
|
|
||||||
|
|
||||||
client = mqtt_client.Client()
|
|
||||||
client.on_connect = on_connect
|
|
||||||
client.connect(broker, port)
|
|
||||||
client.loop_start()
|
|
||||||
return client
|
|
||||||
@ -1,32 +0,0 @@
|
|||||||
from datetime import datetime
|
|
||||||
from pydantic import BaseModel, field_validator
|
|
||||||
|
|
||||||
|
|
||||||
class AccelerometerData(BaseModel):
|
|
||||||
x: float
|
|
||||||
y: float
|
|
||||||
z: float
|
|
||||||
|
|
||||||
|
|
||||||
class GpsData(BaseModel):
|
|
||||||
latitude: float
|
|
||||||
longitude: float
|
|
||||||
|
|
||||||
|
|
||||||
class AgentData(BaseModel):
|
|
||||||
accelerometer: AccelerometerData
|
|
||||||
gps: GpsData
|
|
||||||
timestamp: datetime
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
@field_validator("timestamp", mode="before")
|
|
||||||
def parse_timestamp(cls, value):
|
|
||||||
# Convert the timestamp to a datetime object
|
|
||||||
if isinstance(value, datetime):
|
|
||||||
return value
|
|
||||||
try:
|
|
||||||
return datetime.fromisoformat(value)
|
|
||||||
except (TypeError, ValueError):
|
|
||||||
raise ValueError(
|
|
||||||
"Invalid timestamp format. Expected ISO 8601 format (YYYY-MM-DDTHH:MM:SSZ)."
|
|
||||||
)
|
|
||||||
@ -1,7 +0,0 @@
|
|||||||
from pydantic import BaseModel
|
|
||||||
from app.entities.agent_data import AgentData
|
|
||||||
|
|
||||||
|
|
||||||
class ProcessedAgentData(BaseModel):
|
|
||||||
road_state: str
|
|
||||||
agent_data: AgentData
|
|
||||||
@ -1,40 +0,0 @@
|
|||||||
from abc import ABC, abstractmethod
|
|
||||||
|
|
||||||
|
|
||||||
class AgentGateway(ABC):
|
|
||||||
"""
|
|
||||||
Abstract class representing the Agent Gateway interface.
|
|
||||||
All agent gateway adapters must implement these methods.
|
|
||||||
"""
|
|
||||||
|
|
||||||
@abstractmethod
|
|
||||||
def on_message(self, client, userdata, msg):
|
|
||||||
"""
|
|
||||||
Method to handle incoming messages from the agent.
|
|
||||||
Parameters:
|
|
||||||
client: MQTT client instance.
|
|
||||||
userdata: Any additional user data passed to the MQTT client.
|
|
||||||
msg: The MQTT message received from the agent.
|
|
||||||
"""
|
|
||||||
pass
|
|
||||||
|
|
||||||
@abstractmethod
|
|
||||||
def connect(self):
|
|
||||||
"""
|
|
||||||
Method to establish a connection to the agent.
|
|
||||||
"""
|
|
||||||
pass
|
|
||||||
|
|
||||||
@abstractmethod
|
|
||||||
def start(self):
|
|
||||||
"""
|
|
||||||
Method to start listening for messages from the agent.
|
|
||||||
"""
|
|
||||||
pass
|
|
||||||
|
|
||||||
@abstractmethod
|
|
||||||
def stop(self):
|
|
||||||
"""
|
|
||||||
Method to stop the agent gateway and clean up resources.
|
|
||||||
"""
|
|
||||||
pass
|
|
||||||
@ -1,20 +0,0 @@
|
|||||||
from abc import ABC, abstractmethod
|
|
||||||
from app.entities.processed_agent_data import ProcessedAgentData
|
|
||||||
|
|
||||||
|
|
||||||
class HubGateway(ABC):
|
|
||||||
"""
|
|
||||||
Abstract class representing the Store Gateway interface.
|
|
||||||
All store gateway adapters must implement these methods.
|
|
||||||
"""
|
|
||||||
|
|
||||||
@abstractmethod
|
|
||||||
def save_data(self, processed_data: ProcessedAgentData) -> bool:
|
|
||||||
"""
|
|
||||||
Method to save the processed agent data in the database.
|
|
||||||
Parameters:
|
|
||||||
processed_data (ProcessedAgentData): The processed agent data to be saved.
|
|
||||||
Returns:
|
|
||||||
bool: True if the data is successfully saved, False otherwise.
|
|
||||||
"""
|
|
||||||
pass
|
|
||||||
@ -1,15 +0,0 @@
|
|||||||
from app.entities.agent_data import AgentData
|
|
||||||
from app.entities.processed_agent_data import ProcessedAgentData
|
|
||||||
|
|
||||||
|
|
||||||
def process_agent_data(
|
|
||||||
agent_data: AgentData,
|
|
||||||
) -> ProcessedAgentData:
|
|
||||||
"""
|
|
||||||
Process agent data and classify the state of the road surface.
|
|
||||||
Parameters:
|
|
||||||
agent_data (AgentData): Agent data that containing accelerometer, GPS, and timestamp.
|
|
||||||
Returns:
|
|
||||||
processed_data_batch (ProcessedAgentData): Processed data containing the classified state of the road surface and agent data.
|
|
||||||
"""
|
|
||||||
# Implement it
|
|
||||||
@ -1,24 +0,0 @@
|
|||||||
import os
|
|
||||||
|
|
||||||
|
|
||||||
def try_parse_int(value: str):
|
|
||||||
try:
|
|
||||||
return int(value)
|
|
||||||
except Exception:
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
# Configuration for agent MQTT
|
|
||||||
MQTT_BROKER_HOST = os.environ.get("MQTT_BROKER_HOST") or "localhost"
|
|
||||||
MQTT_BROKER_PORT = try_parse_int(os.environ.get("MQTT_BROKER_PORT")) or 1883
|
|
||||||
MQTT_TOPIC = os.environ.get("MQTT_TOPIC") or "agent_data_topic"
|
|
||||||
|
|
||||||
# Configuration for hub MQTT
|
|
||||||
HUB_MQTT_BROKER_HOST = os.environ.get("HUB_MQTT_BROKER_HOST") or "localhost"
|
|
||||||
HUB_MQTT_BROKER_PORT = try_parse_int(os.environ.get("HUB_MQTT_BROKER_PORT")) or 1883
|
|
||||||
HUB_MQTT_TOPIC = os.environ.get("HUB_MQTT_TOPIC") or "processed_agent_data_topic"
|
|
||||||
|
|
||||||
# Configuration for the Hub
|
|
||||||
HUB_HOST = os.environ.get("HUB_HOST") or "localhost"
|
|
||||||
HUB_PORT = try_parse_int(os.environ.get("HUB_PORT")) or 12000
|
|
||||||
HUB_URL = f"http://{HUB_HOST}:{HUB_PORT}"
|
|
||||||
@ -1,48 +0,0 @@
|
|||||||
version: "3.9"
|
|
||||||
# name: "road_vision"
|
|
||||||
services:
|
|
||||||
mqtt:
|
|
||||||
image: eclipse-mosquitto
|
|
||||||
container_name: mqtt
|
|
||||||
volumes:
|
|
||||||
- ./mosquitto:/mosquitto
|
|
||||||
- ./mosquitto/data:/mosquitto/data
|
|
||||||
- ./mosquitto/log:/mosquitto/log
|
|
||||||
ports:
|
|
||||||
- 1883:1883
|
|
||||||
- 19001:9001
|
|
||||||
networks:
|
|
||||||
mqtt_network:
|
|
||||||
|
|
||||||
|
|
||||||
edge:
|
|
||||||
container_name: edge
|
|
||||||
build: ../
|
|
||||||
depends_on:
|
|
||||||
- mqtt
|
|
||||||
environment:
|
|
||||||
MQTT_BROKER_HOST: "mqtt"
|
|
||||||
MQTT_BROKER_PORT: 1883
|
|
||||||
MQTT_TOPIC: " "
|
|
||||||
HUB_HOST: "store"
|
|
||||||
HUB_PORT: 8000
|
|
||||||
HUB_MQTT_BROKER_HOST: "mqtt"
|
|
||||||
HUB_MQTT_BROKER_PORT: 1883
|
|
||||||
HUB_MQTT_TOPIC: "processed_data_topic"
|
|
||||||
networks:
|
|
||||||
mqtt_network:
|
|
||||||
edge_hub:
|
|
||||||
|
|
||||||
|
|
||||||
networks:
|
|
||||||
mqtt_network:
|
|
||||||
db_network:
|
|
||||||
edge_hub:
|
|
||||||
hub:
|
|
||||||
hub_store:
|
|
||||||
hub_redis:
|
|
||||||
|
|
||||||
|
|
||||||
volumes:
|
|
||||||
postgres_data:
|
|
||||||
pgadmin-data:
|
|
||||||
@ -1,11 +0,0 @@
|
|||||||
persistence true
|
|
||||||
persistence_location /mosquitto/data/
|
|
||||||
listener 1883
|
|
||||||
## Authentication ##
|
|
||||||
allow_anonymous true
|
|
||||||
# allow_anonymous false
|
|
||||||
# password_file /mosquitto/config/password.txt
|
|
||||||
## Log ##
|
|
||||||
log_dest file /mosquitto/log/mosquitto.log
|
|
||||||
log_dest stdout
|
|
||||||
# listener 1883
|
|
||||||
51
edge/main.py
51
edge/main.py
@ -1,51 +0,0 @@
|
|||||||
import logging
|
|
||||||
from app.adapters.agent_mqtt_adapter import AgentMQTTAdapter
|
|
||||||
from app.adapters.hub_http_adapter import HubHttpAdapter
|
|
||||||
from app.adapters.hub_mqtt_adapter import HubMqttAdapter
|
|
||||||
from config import (
|
|
||||||
MQTT_BROKER_HOST,
|
|
||||||
MQTT_BROKER_PORT,
|
|
||||||
MQTT_TOPIC,
|
|
||||||
HUB_URL,
|
|
||||||
HUB_MQTT_BROKER_HOST,
|
|
||||||
HUB_MQTT_BROKER_PORT,
|
|
||||||
HUB_MQTT_TOPIC,
|
|
||||||
)
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
# Configure logging settings
|
|
||||||
logging.basicConfig(
|
|
||||||
level=logging.INFO, # Set the log level to INFO (you can use logging.DEBUG for more detailed logs)
|
|
||||||
format="[%(asctime)s] [%(levelname)s] [%(module)s] %(message)s",
|
|
||||||
handlers=[
|
|
||||||
logging.StreamHandler(), # Output log messages to the console
|
|
||||||
logging.FileHandler("app.log"), # Save log messages to a file
|
|
||||||
],
|
|
||||||
)
|
|
||||||
# Create an instance of the StoreApiAdapter using the configuration
|
|
||||||
# hub_adapter = HubHttpAdapter(
|
|
||||||
# api_base_url=HUB_URL,
|
|
||||||
# )
|
|
||||||
hub_adapter = HubMqttAdapter(
|
|
||||||
broker=HUB_MQTT_BROKER_HOST,
|
|
||||||
port=HUB_MQTT_BROKER_PORT,
|
|
||||||
topic=HUB_MQTT_TOPIC,
|
|
||||||
)
|
|
||||||
# Create an instance of the AgentMQTTAdapter using the configuration
|
|
||||||
agent_adapter = AgentMQTTAdapter(
|
|
||||||
broker_host=MQTT_BROKER_HOST,
|
|
||||||
broker_port=MQTT_BROKER_PORT,
|
|
||||||
topic=MQTT_TOPIC,
|
|
||||||
hub_gateway=hub_adapter,
|
|
||||||
)
|
|
||||||
try:
|
|
||||||
# Connect to the MQTT broker and start listening for messages
|
|
||||||
agent_adapter.connect()
|
|
||||||
agent_adapter.start()
|
|
||||||
# Keep the system running indefinitely (you can add other logic as needed)
|
|
||||||
while True:
|
|
||||||
pass
|
|
||||||
except KeyboardInterrupt:
|
|
||||||
# Stop the MQTT adapter and exit gracefully if interrupted by the user
|
|
||||||
agent_adapter.stop()
|
|
||||||
logging.info("System stopped.")
|
|
||||||
@ -1,10 +0,0 @@
|
|||||||
annotated-types==0.6.0
|
|
||||||
certifi==2024.2.2
|
|
||||||
charset-normalizer==3.3.2
|
|
||||||
idna==3.6
|
|
||||||
paho-mqtt==1.6.1
|
|
||||||
pydantic==2.6.1
|
|
||||||
pydantic_core==2.16.2
|
|
||||||
requests==2.31.0
|
|
||||||
typing_extensions==4.9.0
|
|
||||||
urllib3==2.2.0
|
|
||||||
2
hub/.gitignore
vendored
2
hub/.gitignore
vendored
@ -1,2 +0,0 @@
|
|||||||
venv
|
|
||||||
__pycache__
|
|
||||||
@ -1,11 +0,0 @@
|
|||||||
# Use the official Python image as the base image
|
|
||||||
FROM python:3.9-slim
|
|
||||||
# Set the working directory inside the container
|
|
||||||
WORKDIR /app
|
|
||||||
# Copy the requirements.txt file and install dependencies
|
|
||||||
COPY requirements.txt .
|
|
||||||
RUN pip install --no-cache-dir -r requirements.txt
|
|
||||||
# Copy the entire application into the container
|
|
||||||
COPY . .
|
|
||||||
# Run the main.py script inside the container when it starts
|
|
||||||
CMD ["uvicorn", "main:app", "--host", "0.0.0.0"]
|
|
||||||
@ -1,33 +0,0 @@
|
|||||||
# Hub
|
|
||||||
## Instructions for Starting the Project
|
|
||||||
To start the Hub, follow these steps:
|
|
||||||
1. Clone the repository to your local machine:
|
|
||||||
```bash
|
|
||||||
git clone https://github.com/Toolf/hub.git
|
|
||||||
cd hub
|
|
||||||
```
|
|
||||||
2. Create and activate a virtual environment (optional but recommended):
|
|
||||||
```bash
|
|
||||||
python -m venv venv
|
|
||||||
source venv/bin/activate # On Windows, use: venv\Scripts\activate
|
|
||||||
```
|
|
||||||
3. Install the project dependencies:
|
|
||||||
```bash
|
|
||||||
pip install -r requirements.txt
|
|
||||||
```
|
|
||||||
4. Run the system:
|
|
||||||
```bash
|
|
||||||
python ./app/main.py
|
|
||||||
```
|
|
||||||
The system will start collecting data from the agent through MQTT and processing it.
|
|
||||||
## Running Tests
|
|
||||||
To run tests for the project, use the following command:
|
|
||||||
```bash
|
|
||||||
python -m unittest discover tests
|
|
||||||
```
|
|
||||||
## Common Commands
|
|
||||||
### 1. Saving Requirements
|
|
||||||
To save the project dependencies to the requirements.txt file:
|
|
||||||
```bash
|
|
||||||
pip freeze > requirements.txt
|
|
||||||
```
|
|
||||||
@ -1,24 +0,0 @@
|
|||||||
import json
|
|
||||||
import logging
|
|
||||||
from typing import List
|
|
||||||
|
|
||||||
import pydantic_core
|
|
||||||
import requests
|
|
||||||
|
|
||||||
from app.entities.processed_agent_data import ProcessedAgentData
|
|
||||||
from app.interfaces.store_gateway import StoreGateway
|
|
||||||
|
|
||||||
|
|
||||||
class StoreApiAdapter(StoreGateway):
|
|
||||||
def __init__(self, api_base_url):
|
|
||||||
self.api_base_url = api_base_url
|
|
||||||
|
|
||||||
def save_data(self, processed_agent_data_batch: List[ProcessedAgentData]):
|
|
||||||
"""
|
|
||||||
Save the processed road data to the Store API.
|
|
||||||
Parameters:
|
|
||||||
processed_agent_data_batch (dict): Processed road data to be saved.
|
|
||||||
Returns:
|
|
||||||
bool: True if the data is successfully saved, False otherwise.
|
|
||||||
"""
|
|
||||||
# Implement it
|
|
||||||
@ -1,33 +0,0 @@
|
|||||||
from datetime import datetime
|
|
||||||
from pydantic import BaseModel, field_validator
|
|
||||||
|
|
||||||
|
|
||||||
class AccelerometerData(BaseModel):
|
|
||||||
x: float
|
|
||||||
y: float
|
|
||||||
z: float
|
|
||||||
|
|
||||||
|
|
||||||
class GpsData(BaseModel):
|
|
||||||
latitude: float
|
|
||||||
longitude: float
|
|
||||||
|
|
||||||
|
|
||||||
class AgentData(BaseModel):
|
|
||||||
user_id: int
|
|
||||||
accelerometer: AccelerometerData
|
|
||||||
gps: GpsData
|
|
||||||
timestamp: datetime
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
@field_validator('timestamp', mode='before')
|
|
||||||
def parse_timestamp(cls, value):
|
|
||||||
# Convert the timestamp to a datetime object
|
|
||||||
if isinstance(value, datetime):
|
|
||||||
return value
|
|
||||||
try:
|
|
||||||
return datetime.fromisoformat(value)
|
|
||||||
except (TypeError, ValueError):
|
|
||||||
raise ValueError(
|
|
||||||
"Invalid timestamp format. Expected ISO 8601 format (YYYY-MM-DDTHH:MM:SSZ)."
|
|
||||||
)
|
|
||||||
@ -1,7 +0,0 @@
|
|||||||
from pydantic import BaseModel
|
|
||||||
from app.entities.agent_data import AgentData
|
|
||||||
|
|
||||||
|
|
||||||
class ProcessedAgentData(BaseModel):
|
|
||||||
road_state: str
|
|
||||||
agent_data: AgentData
|
|
||||||
@ -1,21 +0,0 @@
|
|||||||
from abc import ABC, abstractmethod
|
|
||||||
from typing import List
|
|
||||||
from app.entities.processed_agent_data import ProcessedAgentData
|
|
||||||
|
|
||||||
|
|
||||||
class StoreGateway(ABC):
|
|
||||||
"""
|
|
||||||
Abstract class representing the Store Gateway interface.
|
|
||||||
All store gateway adapters must implement these methods.
|
|
||||||
"""
|
|
||||||
|
|
||||||
@abstractmethod
|
|
||||||
def save_data(self, processed_agent_data_batch: List[ProcessedAgentData]) -> bool:
|
|
||||||
"""
|
|
||||||
Method to save the processed agent data in the database.
|
|
||||||
Parameters:
|
|
||||||
processed_agent_data_batch (ProcessedAgentData): The processed agent data to be saved.
|
|
||||||
Returns:
|
|
||||||
bool: True if the data is successfully saved, False otherwise.
|
|
||||||
"""
|
|
||||||
pass
|
|
||||||
@ -1,26 +0,0 @@
|
|||||||
import os
|
|
||||||
|
|
||||||
|
|
||||||
def try_parse_int(value: str):
|
|
||||||
try:
|
|
||||||
return int(value)
|
|
||||||
except Exception:
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
# Configuration for the Store API
|
|
||||||
STORE_API_HOST = os.environ.get("STORE_API_HOST") or "localhost"
|
|
||||||
STORE_API_PORT = try_parse_int(os.environ.get("STORE_API_PORT")) or 8000
|
|
||||||
STORE_API_BASE_URL = f"http://{STORE_API_HOST}:{STORE_API_PORT}"
|
|
||||||
|
|
||||||
# Configure for Redis
|
|
||||||
REDIS_HOST = os.environ.get("REDIS_HOST") or "localhost"
|
|
||||||
REDIS_PORT = try_parse_int(os.environ.get("REDIS_PORT")) or 6379
|
|
||||||
|
|
||||||
# Configure for hub logic
|
|
||||||
BATCH_SIZE = try_parse_int(os.environ.get("BATCH_SIZE")) or 20
|
|
||||||
|
|
||||||
# MQTT
|
|
||||||
MQTT_BROKER_HOST = os.environ.get("MQTT_BROKER_HOST") or "localhost"
|
|
||||||
MQTT_BROKER_PORT = try_parse_int(os.environ.get("MQTT_BROKER_PORT")) or 1883
|
|
||||||
MQTT_TOPIC = os.environ.get("MQTT_TOPIC") or "processed_agent_data_topic"
|
|
||||||
@ -1,11 +0,0 @@
|
|||||||
CREATE TABLE processed_agent_data (
|
|
||||||
id SERIAL PRIMARY KEY,
|
|
||||||
road_state VARCHAR(255) NOT NULL,
|
|
||||||
user_id INTEGER NOT NULL,
|
|
||||||
x FLOAT,
|
|
||||||
y FLOAT,
|
|
||||||
z FLOAT,
|
|
||||||
latitude FLOAT,
|
|
||||||
longitude FLOAT,
|
|
||||||
timestamp TIMESTAMP
|
|
||||||
);
|
|
||||||
@ -1,111 +0,0 @@
|
|||||||
version: "3.9"
|
|
||||||
name: "road_vision__hub"
|
|
||||||
services:
|
|
||||||
mqtt:
|
|
||||||
image: eclipse-mosquitto
|
|
||||||
container_name: mqtt
|
|
||||||
volumes:
|
|
||||||
- ./mosquitto:/mosquitto
|
|
||||||
- ./mosquitto/data:/mosquitto/data
|
|
||||||
- ./mosquitto/log:/mosquitto/log
|
|
||||||
ports:
|
|
||||||
- 1883:1883
|
|
||||||
- 9001:9001
|
|
||||||
networks:
|
|
||||||
mqtt_network:
|
|
||||||
|
|
||||||
|
|
||||||
postgres_db:
|
|
||||||
image: postgres:latest
|
|
||||||
container_name: postgres_db
|
|
||||||
restart: always
|
|
||||||
environment:
|
|
||||||
POSTGRES_USER: user
|
|
||||||
POSTGRES_PASSWORD: pass
|
|
||||||
POSTGRES_DB: test_db
|
|
||||||
volumes:
|
|
||||||
- postgres_data:/var/lib/postgresql/data
|
|
||||||
- ./db/structure.sql:/docker-entrypoint-initdb.d/structure.sql
|
|
||||||
ports:
|
|
||||||
- "5432:5432"
|
|
||||||
networks:
|
|
||||||
db_network:
|
|
||||||
|
|
||||||
|
|
||||||
pgadmin:
|
|
||||||
container_name: pgadmin4
|
|
||||||
image: dpage/pgadmin4
|
|
||||||
restart: always
|
|
||||||
environment:
|
|
||||||
PGADMIN_DEFAULT_EMAIL: admin@admin.com
|
|
||||||
PGADMIN_DEFAULT_PASSWORD: root
|
|
||||||
volumes:
|
|
||||||
- pgadmin-data:/var/lib/pgadmin
|
|
||||||
ports:
|
|
||||||
- "5050:80"
|
|
||||||
networks:
|
|
||||||
db_network:
|
|
||||||
|
|
||||||
|
|
||||||
store:
|
|
||||||
container_name: store
|
|
||||||
build: ../../store
|
|
||||||
depends_on:
|
|
||||||
- postgres_db
|
|
||||||
restart: always
|
|
||||||
environment:
|
|
||||||
POSTGRES_USER: user
|
|
||||||
POSTGRES_PASSWORD: pass
|
|
||||||
POSTGRES_DB: test_db
|
|
||||||
POSTGRES_HOST: postgres_db
|
|
||||||
POSTGRES_PORT: 5432
|
|
||||||
ports:
|
|
||||||
- "8000:8000"
|
|
||||||
networks:
|
|
||||||
db_network:
|
|
||||||
hub_store:
|
|
||||||
|
|
||||||
|
|
||||||
redis:
|
|
||||||
image: redis:latest
|
|
||||||
container_name: redis
|
|
||||||
ports:
|
|
||||||
- "6379:6379"
|
|
||||||
networks:
|
|
||||||
hub_redis:
|
|
||||||
|
|
||||||
|
|
||||||
hub:
|
|
||||||
container_name: hub
|
|
||||||
build: ../
|
|
||||||
depends_on:
|
|
||||||
- mqtt
|
|
||||||
- redis
|
|
||||||
- store
|
|
||||||
environment:
|
|
||||||
STORE_API_HOST: "store"
|
|
||||||
STORE_API_PORT: 8000
|
|
||||||
REDIS_HOST: "redis"
|
|
||||||
REDIS_PORT: 6379
|
|
||||||
MQTT_BROKER_HOST: "mqtt"
|
|
||||||
MQTT_BROKER_PORT: 1883
|
|
||||||
MQTT_TOPIC: "processed_data_topic"
|
|
||||||
BATCH_SIZE: 1
|
|
||||||
ports:
|
|
||||||
- "9000:8000"
|
|
||||||
networks:
|
|
||||||
mqtt_network:
|
|
||||||
hub_store:
|
|
||||||
hub_redis:
|
|
||||||
|
|
||||||
|
|
||||||
networks:
|
|
||||||
mqtt_network:
|
|
||||||
db_network:
|
|
||||||
hub_store:
|
|
||||||
hub_redis:
|
|
||||||
|
|
||||||
|
|
||||||
volumes:
|
|
||||||
postgres_data:
|
|
||||||
pgadmin-data:
|
|
||||||
@ -1,11 +0,0 @@
|
|||||||
persistence true
|
|
||||||
persistence_location /mosquitto/data/
|
|
||||||
listener 1883
|
|
||||||
## Authentication ##
|
|
||||||
allow_anonymous true
|
|
||||||
# allow_anonymous false
|
|
||||||
# password_file /mosquitto/config/password.txt
|
|
||||||
## Log ##
|
|
||||||
log_dest file /mosquitto/log/mosquitto.log
|
|
||||||
log_dest stdout
|
|
||||||
# listener 1883
|
|
||||||
96
hub/main.py
96
hub/main.py
@ -1,96 +0,0 @@
|
|||||||
import logging
|
|
||||||
from typing import List
|
|
||||||
|
|
||||||
from fastapi import FastAPI
|
|
||||||
from redis import Redis
|
|
||||||
import paho.mqtt.client as mqtt
|
|
||||||
|
|
||||||
from app.adapters.store_api_adapter import StoreApiAdapter
|
|
||||||
from app.entities.processed_agent_data import ProcessedAgentData
|
|
||||||
from config import (
|
|
||||||
STORE_API_BASE_URL,
|
|
||||||
REDIS_HOST,
|
|
||||||
REDIS_PORT,
|
|
||||||
BATCH_SIZE,
|
|
||||||
MQTT_TOPIC,
|
|
||||||
MQTT_BROKER_HOST,
|
|
||||||
MQTT_BROKER_PORT,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Configure logging settings
|
|
||||||
logging.basicConfig(
|
|
||||||
level=logging.INFO, # Set the log level to INFO (you can use logging.DEBUG for more detailed logs)
|
|
||||||
format="[%(asctime)s] [%(levelname)s] [%(module)s] %(message)s",
|
|
||||||
handlers=[
|
|
||||||
logging.StreamHandler(), # Output log messages to the console
|
|
||||||
logging.FileHandler("app.log"), # Save log messages to a file
|
|
||||||
],
|
|
||||||
)
|
|
||||||
# Create an instance of the Redis using the configuration
|
|
||||||
redis_client = Redis(host=REDIS_HOST, port=REDIS_PORT)
|
|
||||||
# Create an instance of the StoreApiAdapter using the configuration
|
|
||||||
store_adapter = StoreApiAdapter(api_base_url=STORE_API_BASE_URL)
|
|
||||||
# Create an instance of the AgentMQTTAdapter using the configuration
|
|
||||||
|
|
||||||
# FastAPI
|
|
||||||
app = FastAPI()
|
|
||||||
|
|
||||||
|
|
||||||
@app.post("/processed_agent_data/")
|
|
||||||
async def save_processed_agent_data(processed_agent_data: ProcessedAgentData):
|
|
||||||
redis_client.lpush("processed_agent_data", processed_agent_data.model_dump_json())
|
|
||||||
if redis_client.llen("processed_agent_data") >= BATCH_SIZE:
|
|
||||||
processed_agent_data_batch: List[ProcessedAgentData] = []
|
|
||||||
for _ in range(BATCH_SIZE):
|
|
||||||
processed_agent_data = ProcessedAgentData.model_validate_json(
|
|
||||||
redis_client.lpop("processed_agent_data")
|
|
||||||
)
|
|
||||||
processed_agent_data_batch.append(processed_agent_data)
|
|
||||||
print(processed_agent_data_batch)
|
|
||||||
store_adapter.save_data(processed_agent_data_batch=processed_agent_data_batch)
|
|
||||||
return {"status": "ok"}
|
|
||||||
|
|
||||||
|
|
||||||
# MQTT
|
|
||||||
client = mqtt.Client()
|
|
||||||
|
|
||||||
|
|
||||||
def on_connect(client, userdata, flags, rc):
|
|
||||||
if rc == 0:
|
|
||||||
logging.info("Connected to MQTT broker")
|
|
||||||
client.subscribe(MQTT_TOPIC)
|
|
||||||
else:
|
|
||||||
logging.info(f"Failed to connect to MQTT broker with code: {rc}")
|
|
||||||
|
|
||||||
|
|
||||||
def on_message(client, userdata, msg):
|
|
||||||
try:
|
|
||||||
payload: str = msg.payload.decode("utf-8")
|
|
||||||
# Create ProcessedAgentData instance with the received data
|
|
||||||
processed_agent_data = ProcessedAgentData.model_validate_json(
|
|
||||||
payload, strict=True
|
|
||||||
)
|
|
||||||
|
|
||||||
redis_client.lpush(
|
|
||||||
"processed_agent_data", processed_agent_data.model_dump_json()
|
|
||||||
)
|
|
||||||
processed_agent_data_batch: List[ProcessedAgentData] = []
|
|
||||||
if redis_client.llen("processed_agent_data") >= BATCH_SIZE:
|
|
||||||
for _ in range(BATCH_SIZE):
|
|
||||||
processed_agent_data = ProcessedAgentData.model_validate_json(
|
|
||||||
redis_client.lpop("processed_agent_data")
|
|
||||||
)
|
|
||||||
processed_agent_data_batch.append(processed_agent_data)
|
|
||||||
store_adapter.save_data(processed_agent_data_batch=processed_agent_data_batch)
|
|
||||||
return {"status": "ok"}
|
|
||||||
except Exception as e:
|
|
||||||
logging.info(f"Error processing MQTT message: {e}")
|
|
||||||
|
|
||||||
|
|
||||||
# Connect
|
|
||||||
client.on_connect = on_connect
|
|
||||||
client.on_message = on_message
|
|
||||||
client.connect(MQTT_BROKER_HOST, MQTT_BROKER_PORT)
|
|
||||||
|
|
||||||
# Start
|
|
||||||
client.loop_start()
|
|
||||||
Binary file not shown.
@ -1,60 +0,0 @@
|
|||||||
import unittest
|
|
||||||
from unittest.mock import Mock
|
|
||||||
import redis
|
|
||||||
from app.adapters.agent_mqtt_adapter import AgentMQTTAdapter
|
|
||||||
from app.interfaces.store_gateway import StoreGateway
|
|
||||||
from app.entities.agent_data import AccelerometerData, AgentData, GpsData
|
|
||||||
from app.usecases.data_processing import process_agent_data_batch
|
|
||||||
|
|
||||||
class TestAgentMQTTAdapter(unittest.TestCase):
|
|
||||||
def setUp(self):
|
|
||||||
# Create a mock StoreGateway for testing
|
|
||||||
self.mock_store_gateway = Mock(spec=StoreGateway)
|
|
||||||
self.mock_redis = Mock(spec=redis.Redis)
|
|
||||||
# Create the AgentMQTTAdapter instance with the mock StoreGateway
|
|
||||||
self.agent_adapter = AgentMQTTAdapter(
|
|
||||||
broker_host="test_broker",
|
|
||||||
broker_port=1234,
|
|
||||||
topic="test_topic",
|
|
||||||
store_gateway=self.mock_store_gateway,
|
|
||||||
redis_client=self.mock_redis,
|
|
||||||
batch_size=1,
|
|
||||||
)
|
|
||||||
def test_on_message_valid_data(self):
|
|
||||||
# Test handling of valid incoming MQTT message
|
|
||||||
# (Assuming data is in the correct JSON format)
|
|
||||||
valid_json_data = '{"user_id": 1,"accelerometer": {"x": 0.1, "y": 0.2, "z": 0.3}, "gps": {"latitude": 10.123, "longitude": 20.456}, "timestamp": "2023-07-21T12:34:56Z"}'
|
|
||||||
mock_msg = Mock(payload=valid_json_data.encode("utf-8"))
|
|
||||||
self.mock_redis.llen.return_value = 1
|
|
||||||
self.mock_redis.rpop.return_value = valid_json_data
|
|
||||||
# Call on_message with the mock message
|
|
||||||
self.agent_adapter.on_message(None, None, mock_msg)
|
|
||||||
# Ensure that the store_gateway's save_data method is called once with the correct arguments
|
|
||||||
expected_agent_data = AgentData(
|
|
||||||
user_id=1,
|
|
||||||
accelerometer=AccelerometerData(
|
|
||||||
x=0.1,
|
|
||||||
y=0.2,
|
|
||||||
z=0.3,
|
|
||||||
),
|
|
||||||
gps=GpsData(
|
|
||||||
latitude=10.123,
|
|
||||||
longitude=20.456,
|
|
||||||
),
|
|
||||||
timestamp="2023-07-21T12:34:56Z",
|
|
||||||
)
|
|
||||||
self.mock_store_gateway.save_data.assert_called_once_with(
|
|
||||||
process_agent_data_batch([expected_agent_data])
|
|
||||||
)
|
|
||||||
def test_on_message_invalid_data(self):
|
|
||||||
# Test handling of invalid incoming MQTT message
|
|
||||||
# (Assuming data is missing required fields or has incorrect format)
|
|
||||||
invalid_json_data = '{"user_id": 1, "accelerometer": {"x": 0.1, "y": 0.2}, "gps": {"latitude": 10.123}, "timestamp": 12345}'
|
|
||||||
mock_msg = Mock(payload=invalid_json_data.encode("utf-8"))
|
|
||||||
# Call on_message with the mock message
|
|
||||||
self.agent_adapter.on_message(None, None, mock_msg)
|
|
||||||
# Ensure that the store_gateway's save_data method is not called (due to invalid data)
|
|
||||||
self.mock_store_gateway.save_data.assert_not_called()
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
unittest.main()
|
|
||||||
@ -1,72 +0,0 @@
|
|||||||
import requests
|
|
||||||
import unittest
|
|
||||||
from unittest.mock import Mock, patch
|
|
||||||
from app.adapters.store_api_adapter import StoreApiAdapter
|
|
||||||
from app.entities.agent_data import AccelerometerData, AgentData, GpsData
|
|
||||||
from app.entities.processed_agent_data import ProcessedAgentData
|
|
||||||
|
|
||||||
class TestStoreApiAdapter(unittest.TestCase):
|
|
||||||
def setUp(self):
|
|
||||||
# Create the StoreApiAdapter instance
|
|
||||||
self.store_api_adapter = StoreApiAdapter(api_base_url="http://test-api.com")
|
|
||||||
@patch.object(requests, "post")
|
|
||||||
def test_save_data_success(self, mock_post):
|
|
||||||
# Test successful saving of data to the Store API
|
|
||||||
# Sample processed road data
|
|
||||||
agent_data = AgentData(
|
|
||||||
user_id=1,
|
|
||||||
accelerometer=AccelerometerData(
|
|
||||||
x=0.1,
|
|
||||||
y=0.2,
|
|
||||||
z=0.3,
|
|
||||||
),
|
|
||||||
gps=GpsData(
|
|
||||||
latitude=10.123,
|
|
||||||
longitude=20.456,
|
|
||||||
),
|
|
||||||
timestamp="2023-07-21T12:34:56Z",
|
|
||||||
)
|
|
||||||
processed_data = ProcessedAgentData(road_state="normal", agent_data=agent_data)
|
|
||||||
# Mock the response from the Store API
|
|
||||||
mock_response = Mock(status_code=201) # 201 indicates successful creation
|
|
||||||
mock_post.return_value = mock_response
|
|
||||||
# Call the save_data method
|
|
||||||
result = self.store_api_adapter.save_data(processed_data)
|
|
||||||
# Ensure that the post method of the mock is called with the correct arguments
|
|
||||||
mock_post.assert_called_once_with(
|
|
||||||
"http://test-api.com/agent_data", json=processed_data.model_dump()
|
|
||||||
)
|
|
||||||
# Ensure that the result is True, indicating successful saving
|
|
||||||
self.assertTrue(result)
|
|
||||||
@patch.object(requests, "post")
|
|
||||||
def test_save_data_failure(self, mock_post):
|
|
||||||
# Test failure to save data to the Store API
|
|
||||||
# Sample processed road data
|
|
||||||
agent_data = AgentData(
|
|
||||||
user_id=1,
|
|
||||||
accelerometer=AccelerometerData(
|
|
||||||
x=0.1,
|
|
||||||
y=0.2,
|
|
||||||
z=0.3,
|
|
||||||
),
|
|
||||||
gps=GpsData(
|
|
||||||
latitude=10.123,
|
|
||||||
longitude=20.456,
|
|
||||||
),
|
|
||||||
timestamp="2023-07-21T12:34:56Z",
|
|
||||||
)
|
|
||||||
processed_data = ProcessedAgentData(road_state="normal", agent_data=agent_data)
|
|
||||||
# Mock the response from the Store API
|
|
||||||
mock_response = Mock(status_code=400) # 400 indicates a client error
|
|
||||||
mock_post.return_value = mock_response
|
|
||||||
# Call the save_data method
|
|
||||||
result = self.store_api_adapter.save_data(processed_data)
|
|
||||||
# Ensure that the post method of the mock is called with the correct arguments
|
|
||||||
mock_post.assert_called_once_with(
|
|
||||||
"http://test-api.com/agent_data", json=processed_data.model_dump()
|
|
||||||
)
|
|
||||||
# Ensure that the result is False, indicating failure to save
|
|
||||||
self.assertFalse(result)
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
unittest.main()
|
|
||||||
Loading…
x
Reference in New Issue
Block a user