Compare commits
10 Commits
abd6bf0abe
...
b2c7427af0
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b2c7427af0 | ||
| e45faeb281 | |||
|
|
b8db2fe6ee | ||
|
|
0c5954f96c | ||
|
|
3a4fa51aa9 | ||
|
|
e32ba94adc | ||
|
|
93cc8d7378 | ||
|
|
b65670861d | ||
|
|
9a179e09e9 | ||
|
|
173a61d117 |
3
MapView/.gitignore
vendored
Normal file
3
MapView/.gitignore
vendored
Normal file
@ -0,0 +1,3 @@
|
||||
.idea
|
||||
venv
|
||||
__pycache__
|
||||
102
MapView/README.md
Normal file
102
MapView/README.md
Normal file
@ -0,0 +1,102 @@
|
||||
## Лабораторна робота №5
|
||||
### Тема
|
||||
|
||||
Візуалізація якості стану дорожнього покриття за допомогою фреймворку Kivy.
|
||||
|
||||
### Мета
|
||||
|
||||
Розробити програму для візуалізації руху машини на дорозі та якості дороги за допомогою даних датчиків.
|
||||
|
||||
### Підготовка робочого середовище, встановлення проекту
|
||||
|
||||
Створення віртуального середовища
|
||||
|
||||
`python -m venv ./venv`
|
||||
|
||||
Активація середовища для linux
|
||||
|
||||
`source ./venv/bin/activate`
|
||||
|
||||
Активація середовища для windows
|
||||
|
||||
`venv\Scripts\activate`
|
||||
|
||||
Встановлення необхідних бібліотек
|
||||
|
||||
`pip install -r requirements.txt`
|
||||
|
||||
### Завдання
|
||||
|
||||
Для відображення мапи використовується віджет [Mapview](https://mapview.readthedocs.io/en/1.0.4/) для Kivy.
|
||||
|
||||
Для візуалізації руху машини можна використовувати MapMarker та рухати його відповідно GPS даних.
|
||||
Для позначення нерівностей на дорозі також використовувати маркери. Зображення для маркерів можна знайти в папці images.
|
||||
|
||||
Для створення та редагування маршруту машини на мапі використовуйте клас LineMapLayer та функцію
|
||||
`add_point()` з файлу lineMapLayer.py. Додавання лінії на мапу:
|
||||
|
||||
```python
|
||||
map_layer = LineMapLayer()
|
||||
mapview.add_layer(map_layer, mode="scatter")
|
||||
```
|
||||
|
||||
Щоб створити затримку при відображенні руху машини можна використовувати
|
||||
функцію `kivy.clock.Clock.schedule_once()` або `kivy.clock.Clock.schedule_interval()`.
|
||||
|
||||
Дані для відображення на мапі (координати та стан дороги) зчитуються з бази даних через вебсокет.
|
||||
Для їх отримання використовуйте функцію `get_new_points()` з datasource.py.
|
||||
|
||||
**Шаблон основного файлу проєкту**
|
||||
|
||||
```python
|
||||
from kivy.app import App
|
||||
from kivy_garden.mapview import MapMarker, MapView
|
||||
from kivy.clock import Clock
|
||||
from lineMapLayer import LineMapLayer
|
||||
|
||||
|
||||
class MapViewApp(App):
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__()
|
||||
# додати необхідні змінні
|
||||
|
||||
def on_start(self):
|
||||
"""
|
||||
Встановлює необхідні маркери, викликає функцію для оновлення мапи
|
||||
"""
|
||||
|
||||
def update(self, *args):
|
||||
"""
|
||||
Викликається регулярно для оновлення мапи
|
||||
"""
|
||||
|
||||
def update_car_marker(self, point):
|
||||
"""
|
||||
Оновлює відображення маркера машини на мапі
|
||||
:param point: GPS координати
|
||||
"""
|
||||
|
||||
def set_pothole_marker(self, point):
|
||||
"""
|
||||
Встановлює маркер для ями
|
||||
:param point: GPS координати
|
||||
"""
|
||||
|
||||
def set_bump_marker(self, point):
|
||||
"""
|
||||
Встановлює маркер для лежачого поліцейського
|
||||
:param point: GPS координати
|
||||
"""
|
||||
|
||||
def build(self):
|
||||
"""
|
||||
Ініціалізує мапу MapView(zoom, lat, lon)
|
||||
:return: мапу
|
||||
"""
|
||||
self.mapview = MapView()
|
||||
return self.mapview
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
MapViewApp().run()
|
||||
```
|
||||
4
MapView/config.py
Normal file
4
MapView/config.py
Normal file
@ -0,0 +1,4 @@
|
||||
import os
|
||||
|
||||
STORE_HOST = os.environ.get("STORE_HOST") or "localhost"
|
||||
STORE_PORT = os.environ.get("STORE_PORT") or 8000
|
||||
1
MapView/data.csv
Normal file
1
MapView/data.csv
Normal file
File diff suppressed because one or more lines are too long
81
MapView/datasource.py
Normal file
81
MapView/datasource.py
Normal file
@ -0,0 +1,81 @@
|
||||
import asyncio
|
||||
import json
|
||||
from datetime import datetime
|
||||
import websockets
|
||||
from kivy import Logger
|
||||
from pydantic import BaseModel, field_validator
|
||||
from config import STORE_HOST, STORE_PORT
|
||||
|
||||
|
||||
# Pydantic models
|
||||
class ProcessedAgentData(BaseModel):
|
||||
road_state: str
|
||||
user_id: int
|
||||
x: float
|
||||
y: float
|
||||
z: float
|
||||
latitude: float
|
||||
longitude: float
|
||||
timestamp: datetime
|
||||
|
||||
@classmethod
|
||||
@field_validator("timestamp", mode="before")
|
||||
def check_timestamp(cls, value):
|
||||
if isinstance(value, datetime):
|
||||
return value
|
||||
try:
|
||||
return datetime.fromisoformat(value)
|
||||
except (TypeError, ValueError):
|
||||
raise ValueError(
|
||||
"Invalid timestamp format. Expected ISO 8601 format (YYYY-MM-DDTHH:MM:SSZ)."
|
||||
)
|
||||
|
||||
|
||||
class Datasource:
|
||||
def __init__(self, user_id: int):
|
||||
self.index = 0
|
||||
self.user_id = user_id
|
||||
self.connection_status = None
|
||||
self._new_points = []
|
||||
asyncio.ensure_future(self.connect_to_server())
|
||||
|
||||
def get_new_points(self):
|
||||
Logger.debug(self._new_points)
|
||||
points = self._new_points
|
||||
self._new_points = []
|
||||
return points
|
||||
|
||||
async def connect_to_server(self):
|
||||
uri = f"ws://{STORE_HOST}:{STORE_PORT}/ws/{self.user_id}"
|
||||
while True:
|
||||
Logger.debug("CONNECT TO SERVER")
|
||||
async with websockets.connect(uri) as websocket:
|
||||
self.connection_status = "Connected"
|
||||
try:
|
||||
while True:
|
||||
data = await websocket.recv()
|
||||
parsed_data = json.loads(data)
|
||||
self.handle_received_data(parsed_data)
|
||||
except websockets.ConnectionClosedOK:
|
||||
self.connection_status = "Disconnected"
|
||||
Logger.debug("SERVER DISCONNECT")
|
||||
|
||||
def handle_received_data(self, data):
|
||||
# Update your UI or perform actions with received data here
|
||||
Logger.debug(f"Received data: {data}")
|
||||
processed_agent_data_list = sorted(
|
||||
[
|
||||
ProcessedAgentData(**processed_data_json)
|
||||
for processed_data_json in json.loads(data)
|
||||
],
|
||||
key=lambda v: v.timestamp,
|
||||
)
|
||||
new_points = [
|
||||
(
|
||||
processed_agent_data.latitude,
|
||||
processed_agent_data.longitude,
|
||||
processed_agent_data.road_state,
|
||||
)
|
||||
for processed_agent_data in processed_agent_data_list
|
||||
]
|
||||
self._new_points.extend(new_points)
|
||||
BIN
MapView/images/bump.png
Normal file
BIN
MapView/images/bump.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 2.4 KiB |
BIN
MapView/images/car.png
Normal file
BIN
MapView/images/car.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 4.0 KiB |
BIN
MapView/images/pothole.png
Normal file
BIN
MapView/images/pothole.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 4.3 KiB |
147
MapView/lineMapLayer.py
Normal file
147
MapView/lineMapLayer.py
Normal file
@ -0,0 +1,147 @@
|
||||
from kivy_garden.mapview import MapLayer, MapMarker
|
||||
from kivy.graphics import Color, Line
|
||||
from kivy.graphics.context_instructions import Translate, Scale, PushMatrix, PopMatrix
|
||||
from kivy_garden.mapview.utils import clamp
|
||||
from kivy_garden.mapview.constants import (MIN_LONGITUDE, MAX_LONGITUDE, MIN_LATITUDE, MAX_LATITUDE)
|
||||
from math import radians, log, tan, cos, pi
|
||||
|
||||
|
||||
class LineMapLayer(MapLayer):
|
||||
def __init__(self, coordinates=None, color=[0, 0, 1, 1], width=2, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
# if coordinates is None:
|
||||
# coordinates = [[0, 0], [0, 0]]
|
||||
self._coordinates = coordinates
|
||||
self.color = color
|
||||
self._line_points = None
|
||||
self._line_points_offset = (0, 0)
|
||||
self.zoom = 0
|
||||
self.lon = 0
|
||||
self.lat = 0
|
||||
self.ms = 0
|
||||
self._width = width
|
||||
|
||||
@property
|
||||
def coordinates(self):
|
||||
return self._coordinates
|
||||
|
||||
@coordinates.setter
|
||||
def coordinates(self, coordinates):
|
||||
self._coordinates = coordinates
|
||||
self.invalidate_line_points()
|
||||
self.clear_and_redraw()
|
||||
|
||||
def add_point(self, point):
|
||||
if self._coordinates is None:
|
||||
#self._coordinates = [point]
|
||||
self._coordinates = []
|
||||
self._coordinates.append(point)
|
||||
# self._coordinates = [self._coordinates[-1], point]
|
||||
self.invalidate_line_points()
|
||||
self.clear_and_redraw()
|
||||
|
||||
@property
|
||||
def line_points(self):
|
||||
if self._line_points is None:
|
||||
self.calc_line_points()
|
||||
return self._line_points
|
||||
|
||||
@property
|
||||
def line_points_offset(self):
|
||||
if self._line_points is None:
|
||||
self.calc_line_points()
|
||||
return self._line_points_offset
|
||||
|
||||
def calc_line_points(self):
|
||||
# Offset all points by the coordinates of the first point,
|
||||
# to keep coordinates closer to zero.
|
||||
# (and therefore avoid some float precision issues when drawing lines)
|
||||
self._line_points_offset = (self.get_x(self.coordinates[0][1]),
|
||||
self.get_y(self.coordinates[0][0]))
|
||||
# Since lat is not a linear transform we must compute manually
|
||||
self._line_points = [(self.get_x(lon) - self._line_points_offset[0],
|
||||
self.get_y(lat) - self._line_points_offset[1])
|
||||
for lat, lon in self.coordinates]
|
||||
|
||||
def invalidate_line_points(self):
|
||||
self._line_points = None
|
||||
self._line_points_offset = (0, 0)
|
||||
|
||||
def get_x(self, lon):
|
||||
"""Get the x position on the map using this map source's projection
|
||||
(0, 0) is located at the top left.
|
||||
"""
|
||||
return clamp(lon, MIN_LONGITUDE, MAX_LONGITUDE) * self.ms / 360.0
|
||||
|
||||
def get_y(self, lat):
|
||||
"""Get the y position on the map using this map source's projection
|
||||
(0, 0) is located at the top left.
|
||||
"""
|
||||
lat = radians(clamp(-lat, MIN_LATITUDE, MAX_LATITUDE))
|
||||
return (1.0 - log(tan(lat) + 1.0 / cos(lat)) / pi) * self.ms / 2.0
|
||||
|
||||
# Function called when the MapView is moved
|
||||
def reposition(self):
|
||||
map_view = self.parent
|
||||
|
||||
# Must redraw when the zoom changes
|
||||
# as the scatter transform resets for the new tiles
|
||||
if self.zoom != map_view.zoom or \
|
||||
self.lon != round(map_view.lon, 7) or \
|
||||
self.lat != round(map_view.lat, 7):
|
||||
map_source = map_view.map_source
|
||||
self.ms = pow(2.0, map_view.zoom) * map_source.dp_tile_size
|
||||
self.invalidate_line_points()
|
||||
self.clear_and_redraw()
|
||||
|
||||
def clear_and_redraw(self, *args):
|
||||
with self.canvas:
|
||||
# Clear old line
|
||||
self.canvas.clear()
|
||||
|
||||
self._draw_line()
|
||||
|
||||
def _draw_line(self, *args):
|
||||
if self._coordinates is None:
|
||||
return
|
||||
map_view = self.parent
|
||||
self.zoom = map_view.zoom
|
||||
self.lon = map_view.lon
|
||||
self.lat = map_view.lat
|
||||
|
||||
# When zooming we must undo the current scatter transform
|
||||
# or the animation distorts it
|
||||
scatter = map_view._scatter
|
||||
sx, sy, ss = scatter.x, scatter.y, scatter.scale
|
||||
|
||||
# Account for map source tile size and map view zoom
|
||||
vx, vy, vs = map_view.viewport_pos[0], map_view.viewport_pos[1], map_view.scale
|
||||
|
||||
with self.canvas:
|
||||
self.opacity = 0.5
|
||||
# Save the current coordinate space context
|
||||
PushMatrix()
|
||||
|
||||
# Offset by the MapView's position in the window (always 0,0 ?)
|
||||
Translate(*map_view.pos)
|
||||
|
||||
# Undo the scatter animation transform
|
||||
Scale(1 / ss, 1 / ss, 1)
|
||||
Translate(-sx, -sy)
|
||||
|
||||
# Apply the get window xy from transforms
|
||||
Scale(vs, vs, 1)
|
||||
Translate(-vx, -vy)
|
||||
|
||||
# Apply what we can factor out of the mapsource long, lat to x, y conversion
|
||||
Translate(self.ms / 2, 0)
|
||||
|
||||
# Translate by the offset of the line points
|
||||
# (this keeps the points closer to the origin)
|
||||
Translate(*self.line_points_offset)
|
||||
|
||||
Color(*self.color)
|
||||
Line(points=self.line_points, width=self._width)
|
||||
|
||||
# Retrieve the last saved coordinate space context
|
||||
PopMatrix()
|
||||
55
MapView/main.py
Normal file
55
MapView/main.py
Normal file
@ -0,0 +1,55 @@
|
||||
import asyncio
|
||||
from kivy.app import App
|
||||
from kivy_garden.mapview import MapMarker, MapView
|
||||
from kivy.clock import Clock
|
||||
from lineMapLayer import LineMapLayer
|
||||
from datasource import Datasource
|
||||
|
||||
|
||||
class MapViewApp(App):
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__()
|
||||
# додати необхідні змінні
|
||||
|
||||
def on_start(self):
|
||||
"""
|
||||
Встановлює необхідні маркери, викликає функцію для оновлення мапи
|
||||
"""
|
||||
|
||||
def update(self, *args):
|
||||
"""
|
||||
Викликається регулярно для оновлення мапи
|
||||
"""
|
||||
|
||||
|
||||
def update_car_marker(self, point):
|
||||
"""
|
||||
Оновлює відображення маркера машини на мапі
|
||||
:param point: GPS координати
|
||||
"""
|
||||
|
||||
def set_pothole_marker(self, point):
|
||||
"""
|
||||
Встановлює маркер для ями
|
||||
:param point: GPS координати
|
||||
"""
|
||||
|
||||
def set_bump_marker(self, point):
|
||||
"""
|
||||
Встановлює маркер для лежачого поліцейського
|
||||
:param point: GPS координати
|
||||
"""
|
||||
|
||||
def build(self):
|
||||
"""
|
||||
Ініціалізує мапу MapView(zoom, lat, lon)
|
||||
:return: мапу
|
||||
"""
|
||||
self.mapview = MapView()
|
||||
return self.mapview
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
loop = asyncio.get_event_loop()
|
||||
loop.run_until_complete(MapViewApp().async_run(async_lib="asyncio"))
|
||||
loop.close()
|
||||
15
MapView/requirements.txt
Normal file
15
MapView/requirements.txt
Normal file
@ -0,0 +1,15 @@
|
||||
annotated-types==0.6.0
|
||||
certifi==2024.2.2
|
||||
charset-normalizer==3.3.2
|
||||
docutils==0.20.1
|
||||
idna==3.6
|
||||
Kivy==2.3.0
|
||||
Kivy-Garden==0.1.5
|
||||
kivy-garden.mapview==1.0.6
|
||||
pydantic==2.6.2
|
||||
pydantic_core==2.16.3
|
||||
Pygments==2.17.2
|
||||
requests==2.31.0
|
||||
typing_extensions==4.10.0
|
||||
urllib3==2.2.1
|
||||
websockets==12.0
|
||||
@ -1,4 +1,3 @@
|
||||
version: "3.9"
|
||||
name: "road_vision"
|
||||
services:
|
||||
mqtt:
|
||||
|
||||
@ -8,6 +8,7 @@ def try_parse(type, value: str):
|
||||
return None
|
||||
|
||||
|
||||
USER_ID = 1
|
||||
# MQTT config
|
||||
MQTT_BROKER_HOST = os.environ.get("MQTT_BROKER_HOST") or "mqtt"
|
||||
MQTT_BROKER_PORT = try_parse(int, os.environ.get("MQTT_BROKER_PORT")) or 1883
|
||||
|
||||
@ -9,4 +9,5 @@ from domain.gps import Gps
|
||||
class AggregatedData:
|
||||
accelerometer: Accelerometer
|
||||
gps: Gps
|
||||
time: datetime
|
||||
timestamp: datetime
|
||||
user_id: int
|
||||
|
||||
@ -1,6 +1,9 @@
|
||||
from csv import reader
|
||||
from datetime import datetime
|
||||
from domain.accelerometer import Accelerometer
|
||||
from domain.gps import Gps
|
||||
from domain.aggregated_data import AggregatedData
|
||||
import config
|
||||
|
||||
|
||||
class FileDatasource:
|
||||
@ -13,6 +16,12 @@ class FileDatasource:
|
||||
|
||||
def read(self) -> AggregatedData:
|
||||
"""Метод повертає дані отримані з датчиків"""
|
||||
return AggregatedData(
|
||||
Accelerometer(1, 2, 3),
|
||||
Gps(4, 5),
|
||||
datetime.now(),
|
||||
config.USER_ID,
|
||||
)
|
||||
|
||||
def startReading(self, *args, **kwargs):
|
||||
"""Метод повинен викликатись перед початком читання даних"""
|
||||
|
||||
@ -6,4 +6,5 @@ from schema.gps_schema import GpsSchema
|
||||
class AggregatedDataSchema(Schema):
|
||||
accelerometer = fields.Nested(AccelerometerSchema)
|
||||
gps = fields.Nested(GpsSchema)
|
||||
time = fields.DateTime("iso")
|
||||
timestamp = fields.DateTime("iso")
|
||||
user_id = fields.Int()
|
||||
|
||||
2
edge/.gitignore
vendored
Normal file
2
edge/.gitignore
vendored
Normal file
@ -0,0 +1,2 @@
|
||||
venv
|
||||
app.log
|
||||
11
edge/Dockerfile
Normal file
11
edge/Dockerfile
Normal file
@ -0,0 +1,11 @@
|
||||
# Use the official Python image as the base image
|
||||
FROM python:3.9-slim
|
||||
# Set the working directory inside the container
|
||||
WORKDIR /app
|
||||
# Copy the requirements.txt file and install dependencies
|
||||
COPY requirements.txt .
|
||||
RUN pip install --no-cache-dir -r requirements.txt
|
||||
# Copy the entire application into the container
|
||||
COPY . .
|
||||
# Run the main.py script inside the container when it starts
|
||||
CMD ["python", "main.py"]
|
||||
76
edge/app/adapters/agent_mqtt_adapter.py
Normal file
76
edge/app/adapters/agent_mqtt_adapter.py
Normal file
@ -0,0 +1,76 @@
|
||||
import logging
|
||||
import paho.mqtt.client as mqtt
|
||||
from app.interfaces.agent_gateway import AgentGateway
|
||||
from app.entities.agent_data import AgentData, GpsData
|
||||
from app.usecases.data_processing import process_agent_data
|
||||
from app.interfaces.hub_gateway import HubGateway
|
||||
|
||||
|
||||
class AgentMQTTAdapter(AgentGateway):
|
||||
def __init__(
|
||||
self,
|
||||
broker_host,
|
||||
broker_port,
|
||||
topic,
|
||||
hub_gateway: HubGateway,
|
||||
batch_size=10,
|
||||
):
|
||||
self.batch_size = batch_size
|
||||
# MQTT
|
||||
self.broker_host = broker_host
|
||||
self.broker_port = broker_port
|
||||
self.topic = topic
|
||||
self.client = mqtt.Client()
|
||||
# Hub
|
||||
self.hub_gateway = hub_gateway
|
||||
|
||||
def on_connect(self, client, userdata, flags, rc):
|
||||
if rc == 0:
|
||||
logging.info("Connected to MQTT broker")
|
||||
self.client.subscribe(self.topic)
|
||||
else:
|
||||
logging.info(f"Failed to connect to MQTT broker with code: {rc}")
|
||||
|
||||
def on_message(self, client, userdata, msg):
|
||||
"""Processing agent data and sent it to hub gateway"""
|
||||
try:
|
||||
payload: str = msg.payload.decode("utf-8")
|
||||
# Create AgentData instance with the received data
|
||||
agent_data = AgentData.model_validate_json(payload, strict=True)
|
||||
# Process the received data (you can call a use case here if needed)
|
||||
processed_data = process_agent_data(agent_data)
|
||||
# Store the agent_data in the database (you can send it to the data processing module)
|
||||
if not self.hub_gateway.save_data(processed_data):
|
||||
logging.error("Hub is not available")
|
||||
except Exception as e:
|
||||
logging.info(f"Error processing MQTT message: {e}")
|
||||
|
||||
def connect(self):
|
||||
self.client.on_connect = self.on_connect
|
||||
self.client.on_message = self.on_message
|
||||
self.client.connect(self.broker_host, self.broker_port, 60)
|
||||
|
||||
def start(self):
|
||||
self.client.loop_start()
|
||||
|
||||
def stop(self):
|
||||
self.client.loop_stop()
|
||||
|
||||
|
||||
# Usage example:
|
||||
if __name__ == "__main__":
|
||||
broker_host = "localhost"
|
||||
broker_port = 1883
|
||||
topic = "agent_data_topic"
|
||||
# Assuming you have implemented the StoreGateway and passed it to the adapter
|
||||
store_gateway = HubGateway()
|
||||
adapter = AgentMQTTAdapter(broker_host, broker_port, topic, store_gateway)
|
||||
adapter.connect()
|
||||
adapter.start()
|
||||
try:
|
||||
# Keep the adapter running in the background
|
||||
while True:
|
||||
pass
|
||||
except KeyboardInterrupt:
|
||||
adapter.stop()
|
||||
logging.info("Adapter stopped.")
|
||||
29
edge/app/adapters/hub_http_adapter.py
Normal file
29
edge/app/adapters/hub_http_adapter.py
Normal file
@ -0,0 +1,29 @@
|
||||
import logging
|
||||
|
||||
import requests as requests
|
||||
|
||||
from app.entities.processed_agent_data import ProcessedAgentData
|
||||
from app.interfaces.hub_gateway import HubGateway
|
||||
|
||||
|
||||
class HubHttpAdapter(HubGateway):
|
||||
def __init__(self, api_base_url):
|
||||
self.api_base_url = api_base_url
|
||||
|
||||
def save_data(self, processed_data: ProcessedAgentData):
|
||||
"""
|
||||
Save the processed road data to the Hub.
|
||||
Parameters:
|
||||
processed_data (ProcessedAgentData): Processed road data to be saved.
|
||||
Returns:
|
||||
bool: True if the data is successfully saved, False otherwise.
|
||||
"""
|
||||
url = f"{self.api_base_url}/processed_agent_data/"
|
||||
|
||||
response = requests.post(url, data=processed_data.model_dump_json())
|
||||
if response.status_code != 200:
|
||||
logging.info(
|
||||
f"Invalid Hub response\nData: {processed_data.model_dump_json()}\nResponse: {response}"
|
||||
)
|
||||
return False
|
||||
return True
|
||||
50
edge/app/adapters/hub_mqtt_adapter.py
Normal file
50
edge/app/adapters/hub_mqtt_adapter.py
Normal file
@ -0,0 +1,50 @@
|
||||
import logging
|
||||
|
||||
import requests as requests
|
||||
from paho.mqtt import client as mqtt_client
|
||||
|
||||
from app.entities.processed_agent_data import ProcessedAgentData
|
||||
from app.interfaces.hub_gateway import HubGateway
|
||||
|
||||
|
||||
class HubMqttAdapter(HubGateway):
|
||||
def __init__(self, broker, port, topic):
|
||||
self.broker = broker
|
||||
self.port = port
|
||||
self.topic = topic
|
||||
self.mqtt_client = self._connect_mqtt(broker, port)
|
||||
|
||||
def save_data(self, processed_data: ProcessedAgentData):
|
||||
"""
|
||||
Save the processed road data to the Hub.
|
||||
Parameters:
|
||||
processed_data (ProcessedAgentData): Processed road data to be saved.
|
||||
Returns:
|
||||
bool: True if the data is successfully saved, False otherwise.
|
||||
"""
|
||||
msg = processed_data.model_dump_json()
|
||||
result = self.mqtt_client.publish(self.topic, msg)
|
||||
status = result[0]
|
||||
if status == 0:
|
||||
return True
|
||||
else:
|
||||
print(f"Failed to send message to topic {self.topic}")
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def _connect_mqtt(broker, port):
|
||||
"""Create MQTT client"""
|
||||
print(f"CONNECT TO {broker}:{port}")
|
||||
|
||||
def on_connect(client, userdata, flags, rc):
|
||||
if rc == 0:
|
||||
print(f"Connected to MQTT Broker ({broker}:{port})!")
|
||||
else:
|
||||
print("Failed to connect {broker}:{port}, return code %d\n", rc)
|
||||
exit(rc) # Stop execution
|
||||
|
||||
client = mqtt_client.Client()
|
||||
client.on_connect = on_connect
|
||||
client.connect(broker, port)
|
||||
client.loop_start()
|
||||
return client
|
||||
32
edge/app/entities/agent_data.py
Normal file
32
edge/app/entities/agent_data.py
Normal file
@ -0,0 +1,32 @@
|
||||
from datetime import datetime
|
||||
from pydantic import BaseModel, field_validator
|
||||
|
||||
|
||||
class AccelerometerData(BaseModel):
|
||||
x: float
|
||||
y: float
|
||||
z: float
|
||||
|
||||
|
||||
class GpsData(BaseModel):
|
||||
latitude: float
|
||||
longitude: float
|
||||
|
||||
|
||||
class AgentData(BaseModel):
|
||||
accelerometer: AccelerometerData
|
||||
gps: GpsData
|
||||
timestamp: datetime
|
||||
|
||||
@classmethod
|
||||
@field_validator("timestamp", mode="before")
|
||||
def parse_timestamp(cls, value):
|
||||
# Convert the timestamp to a datetime object
|
||||
if isinstance(value, datetime):
|
||||
return value
|
||||
try:
|
||||
return datetime.fromisoformat(value)
|
||||
except (TypeError, ValueError):
|
||||
raise ValueError(
|
||||
"Invalid timestamp format. Expected ISO 8601 format (YYYY-MM-DDTHH:MM:SSZ)."
|
||||
)
|
||||
7
edge/app/entities/processed_agent_data.py
Normal file
7
edge/app/entities/processed_agent_data.py
Normal file
@ -0,0 +1,7 @@
|
||||
from pydantic import BaseModel
|
||||
from app.entities.agent_data import AgentData
|
||||
|
||||
|
||||
class ProcessedAgentData(BaseModel):
|
||||
road_state: str
|
||||
agent_data: AgentData
|
||||
40
edge/app/interfaces/agent_gateway.py
Normal file
40
edge/app/interfaces/agent_gateway.py
Normal file
@ -0,0 +1,40 @@
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
|
||||
class AgentGateway(ABC):
|
||||
"""
|
||||
Abstract class representing the Agent Gateway interface.
|
||||
All agent gateway adapters must implement these methods.
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def on_message(self, client, userdata, msg):
|
||||
"""
|
||||
Method to handle incoming messages from the agent.
|
||||
Parameters:
|
||||
client: MQTT client instance.
|
||||
userdata: Any additional user data passed to the MQTT client.
|
||||
msg: The MQTT message received from the agent.
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def connect(self):
|
||||
"""
|
||||
Method to establish a connection to the agent.
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def start(self):
|
||||
"""
|
||||
Method to start listening for messages from the agent.
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def stop(self):
|
||||
"""
|
||||
Method to stop the agent gateway and clean up resources.
|
||||
"""
|
||||
pass
|
||||
20
edge/app/interfaces/hub_gateway.py
Normal file
20
edge/app/interfaces/hub_gateway.py
Normal file
@ -0,0 +1,20 @@
|
||||
from abc import ABC, abstractmethod
|
||||
from app.entities.processed_agent_data import ProcessedAgentData
|
||||
|
||||
|
||||
class HubGateway(ABC):
|
||||
"""
|
||||
Abstract class representing the Store Gateway interface.
|
||||
All store gateway adapters must implement these methods.
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def save_data(self, processed_data: ProcessedAgentData) -> bool:
|
||||
"""
|
||||
Method to save the processed agent data in the database.
|
||||
Parameters:
|
||||
processed_data (ProcessedAgentData): The processed agent data to be saved.
|
||||
Returns:
|
||||
bool: True if the data is successfully saved, False otherwise.
|
||||
"""
|
||||
pass
|
||||
15
edge/app/usecases/data_processing.py
Normal file
15
edge/app/usecases/data_processing.py
Normal file
@ -0,0 +1,15 @@
|
||||
from app.entities.agent_data import AgentData
|
||||
from app.entities.processed_agent_data import ProcessedAgentData
|
||||
|
||||
|
||||
def process_agent_data(
|
||||
agent_data: AgentData,
|
||||
) -> ProcessedAgentData:
|
||||
"""
|
||||
Process agent data and classify the state of the road surface.
|
||||
Parameters:
|
||||
agent_data (AgentData): Agent data that containing accelerometer, GPS, and timestamp.
|
||||
Returns:
|
||||
processed_data_batch (ProcessedAgentData): Processed data containing the classified state of the road surface and agent data.
|
||||
"""
|
||||
# Implement it
|
||||
24
edge/config.py
Normal file
24
edge/config.py
Normal file
@ -0,0 +1,24 @@
|
||||
import os
|
||||
|
||||
|
||||
def try_parse_int(value: str):
|
||||
try:
|
||||
return int(value)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
# Configuration for agent MQTT
|
||||
MQTT_BROKER_HOST = os.environ.get("MQTT_BROKER_HOST") or "localhost"
|
||||
MQTT_BROKER_PORT = try_parse_int(os.environ.get("MQTT_BROKER_PORT")) or 1883
|
||||
MQTT_TOPIC = os.environ.get("MQTT_TOPIC") or "agent_data_topic"
|
||||
|
||||
# Configuration for hub MQTT
|
||||
HUB_MQTT_BROKER_HOST = os.environ.get("HUB_MQTT_BROKER_HOST") or "localhost"
|
||||
HUB_MQTT_BROKER_PORT = try_parse_int(os.environ.get("HUB_MQTT_BROKER_PORT")) or 1883
|
||||
HUB_MQTT_TOPIC = os.environ.get("HUB_MQTT_TOPIC") or "processed_agent_data_topic"
|
||||
|
||||
# Configuration for the Hub
|
||||
HUB_HOST = os.environ.get("HUB_HOST") or "localhost"
|
||||
HUB_PORT = try_parse_int(os.environ.get("HUB_PORT")) or 12000
|
||||
HUB_URL = f"http://{HUB_HOST}:{HUB_PORT}"
|
||||
48
edge/docker/docker-compose.yaml
Normal file
48
edge/docker/docker-compose.yaml
Normal file
@ -0,0 +1,48 @@
|
||||
version: "3.9"
|
||||
# name: "road_vision"
|
||||
services:
|
||||
mqtt:
|
||||
image: eclipse-mosquitto
|
||||
container_name: mqtt
|
||||
volumes:
|
||||
- ./mosquitto:/mosquitto
|
||||
- ./mosquitto/data:/mosquitto/data
|
||||
- ./mosquitto/log:/mosquitto/log
|
||||
ports:
|
||||
- 1883:1883
|
||||
- 19001:9001
|
||||
networks:
|
||||
mqtt_network:
|
||||
|
||||
|
||||
edge:
|
||||
container_name: edge
|
||||
build: ../
|
||||
depends_on:
|
||||
- mqtt
|
||||
environment:
|
||||
MQTT_BROKER_HOST: "mqtt"
|
||||
MQTT_BROKER_PORT: 1883
|
||||
MQTT_TOPIC: " "
|
||||
HUB_HOST: "store"
|
||||
HUB_PORT: 8000
|
||||
HUB_MQTT_BROKER_HOST: "mqtt"
|
||||
HUB_MQTT_BROKER_PORT: 1883
|
||||
HUB_MQTT_TOPIC: "processed_data_topic"
|
||||
networks:
|
||||
mqtt_network:
|
||||
edge_hub:
|
||||
|
||||
|
||||
networks:
|
||||
mqtt_network:
|
||||
db_network:
|
||||
edge_hub:
|
||||
hub:
|
||||
hub_store:
|
||||
hub_redis:
|
||||
|
||||
|
||||
volumes:
|
||||
postgres_data:
|
||||
pgadmin-data:
|
||||
11
edge/docker/mosquitto/config/mosquitto.conf
Normal file
11
edge/docker/mosquitto/config/mosquitto.conf
Normal file
@ -0,0 +1,11 @@
|
||||
persistence true
|
||||
persistence_location /mosquitto/data/
|
||||
listener 1883
|
||||
## Authentication ##
|
||||
allow_anonymous true
|
||||
# allow_anonymous false
|
||||
# password_file /mosquitto/config/password.txt
|
||||
## Log ##
|
||||
log_dest file /mosquitto/log/mosquitto.log
|
||||
log_dest stdout
|
||||
# listener 1883
|
||||
51
edge/main.py
Normal file
51
edge/main.py
Normal file
@ -0,0 +1,51 @@
|
||||
import logging
|
||||
from app.adapters.agent_mqtt_adapter import AgentMQTTAdapter
|
||||
from app.adapters.hub_http_adapter import HubHttpAdapter
|
||||
from app.adapters.hub_mqtt_adapter import HubMqttAdapter
|
||||
from config import (
|
||||
MQTT_BROKER_HOST,
|
||||
MQTT_BROKER_PORT,
|
||||
MQTT_TOPIC,
|
||||
HUB_URL,
|
||||
HUB_MQTT_BROKER_HOST,
|
||||
HUB_MQTT_BROKER_PORT,
|
||||
HUB_MQTT_TOPIC,
|
||||
)
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Configure logging settings
|
||||
logging.basicConfig(
|
||||
level=logging.INFO, # Set the log level to INFO (you can use logging.DEBUG for more detailed logs)
|
||||
format="[%(asctime)s] [%(levelname)s] [%(module)s] %(message)s",
|
||||
handlers=[
|
||||
logging.StreamHandler(), # Output log messages to the console
|
||||
logging.FileHandler("app.log"), # Save log messages to a file
|
||||
],
|
||||
)
|
||||
# Create an instance of the StoreApiAdapter using the configuration
|
||||
# hub_adapter = HubHttpAdapter(
|
||||
# api_base_url=HUB_URL,
|
||||
# )
|
||||
hub_adapter = HubMqttAdapter(
|
||||
broker=HUB_MQTT_BROKER_HOST,
|
||||
port=HUB_MQTT_BROKER_PORT,
|
||||
topic=HUB_MQTT_TOPIC,
|
||||
)
|
||||
# Create an instance of the AgentMQTTAdapter using the configuration
|
||||
agent_adapter = AgentMQTTAdapter(
|
||||
broker_host=MQTT_BROKER_HOST,
|
||||
broker_port=MQTT_BROKER_PORT,
|
||||
topic=MQTT_TOPIC,
|
||||
hub_gateway=hub_adapter,
|
||||
)
|
||||
try:
|
||||
# Connect to the MQTT broker and start listening for messages
|
||||
agent_adapter.connect()
|
||||
agent_adapter.start()
|
||||
# Keep the system running indefinitely (you can add other logic as needed)
|
||||
while True:
|
||||
pass
|
||||
except KeyboardInterrupt:
|
||||
# Stop the MQTT adapter and exit gracefully if interrupted by the user
|
||||
agent_adapter.stop()
|
||||
logging.info("System stopped.")
|
||||
10
edge/requirements.txt
Normal file
10
edge/requirements.txt
Normal file
@ -0,0 +1,10 @@
|
||||
annotated-types==0.6.0
|
||||
certifi==2024.2.2
|
||||
charset-normalizer==3.3.2
|
||||
idna==3.6
|
||||
paho-mqtt==1.6.1
|
||||
pydantic==2.6.1
|
||||
pydantic_core==2.16.2
|
||||
requests==2.31.0
|
||||
typing_extensions==4.9.0
|
||||
urllib3==2.2.0
|
||||
2
hub/.gitignore
vendored
Normal file
2
hub/.gitignore
vendored
Normal file
@ -0,0 +1,2 @@
|
||||
venv
|
||||
__pycache__
|
||||
11
hub/Dockerfile
Normal file
11
hub/Dockerfile
Normal file
@ -0,0 +1,11 @@
|
||||
# Use the official Python image as the base image
|
||||
FROM python:3.9-slim
|
||||
# Set the working directory inside the container
|
||||
WORKDIR /app
|
||||
# Copy the requirements.txt file and install dependencies
|
||||
COPY requirements.txt .
|
||||
RUN pip install --no-cache-dir -r requirements.txt
|
||||
# Copy the entire application into the container
|
||||
COPY . .
|
||||
# Run the main.py script inside the container when it starts
|
||||
CMD ["uvicorn", "main:app", "--host", "0.0.0.0"]
|
||||
33
hub/README.md
Normal file
33
hub/README.md
Normal file
@ -0,0 +1,33 @@
|
||||
# Hub
|
||||
## Instructions for Starting the Project
|
||||
To start the Hub, follow these steps:
|
||||
1. Clone the repository to your local machine:
|
||||
```bash
|
||||
git clone https://github.com/Toolf/hub.git
|
||||
cd hub
|
||||
```
|
||||
2. Create and activate a virtual environment (optional but recommended):
|
||||
```bash
|
||||
python -m venv venv
|
||||
source venv/bin/activate # On Windows, use: venv\Scripts\activate
|
||||
```
|
||||
3. Install the project dependencies:
|
||||
```bash
|
||||
pip install -r requirements.txt
|
||||
```
|
||||
4. Run the system:
|
||||
```bash
|
||||
python ./app/main.py
|
||||
```
|
||||
The system will start collecting data from the agent through MQTT and processing it.
|
||||
## Running Tests
|
||||
To run tests for the project, use the following command:
|
||||
```bash
|
||||
python -m unittest discover tests
|
||||
```
|
||||
## Common Commands
|
||||
### 1. Saving Requirements
|
||||
To save the project dependencies to the requirements.txt file:
|
||||
```bash
|
||||
pip freeze > requirements.txt
|
||||
```
|
||||
24
hub/app/adapters/store_api_adapter.py
Normal file
24
hub/app/adapters/store_api_adapter.py
Normal file
@ -0,0 +1,24 @@
|
||||
import json
|
||||
import logging
|
||||
from typing import List
|
||||
|
||||
import pydantic_core
|
||||
import requests
|
||||
|
||||
from app.entities.processed_agent_data import ProcessedAgentData
|
||||
from app.interfaces.store_gateway import StoreGateway
|
||||
|
||||
|
||||
class StoreApiAdapter(StoreGateway):
|
||||
def __init__(self, api_base_url):
|
||||
self.api_base_url = api_base_url
|
||||
|
||||
def save_data(self, processed_agent_data_batch: List[ProcessedAgentData]):
|
||||
"""
|
||||
Save the processed road data to the Store API.
|
||||
Parameters:
|
||||
processed_agent_data_batch (dict): Processed road data to be saved.
|
||||
Returns:
|
||||
bool: True if the data is successfully saved, False otherwise.
|
||||
"""
|
||||
# Implement it
|
||||
33
hub/app/entities/agent_data.py
Normal file
33
hub/app/entities/agent_data.py
Normal file
@ -0,0 +1,33 @@
|
||||
from datetime import datetime
|
||||
from pydantic import BaseModel, field_validator
|
||||
|
||||
|
||||
class AccelerometerData(BaseModel):
|
||||
x: float
|
||||
y: float
|
||||
z: float
|
||||
|
||||
|
||||
class GpsData(BaseModel):
|
||||
latitude: float
|
||||
longitude: float
|
||||
|
||||
|
||||
class AgentData(BaseModel):
|
||||
user_id: int
|
||||
accelerometer: AccelerometerData
|
||||
gps: GpsData
|
||||
timestamp: datetime
|
||||
|
||||
@classmethod
|
||||
@field_validator('timestamp', mode='before')
|
||||
def parse_timestamp(cls, value):
|
||||
# Convert the timestamp to a datetime object
|
||||
if isinstance(value, datetime):
|
||||
return value
|
||||
try:
|
||||
return datetime.fromisoformat(value)
|
||||
except (TypeError, ValueError):
|
||||
raise ValueError(
|
||||
"Invalid timestamp format. Expected ISO 8601 format (YYYY-MM-DDTHH:MM:SSZ)."
|
||||
)
|
||||
7
hub/app/entities/processed_agent_data.py
Normal file
7
hub/app/entities/processed_agent_data.py
Normal file
@ -0,0 +1,7 @@
|
||||
from pydantic import BaseModel
|
||||
from app.entities.agent_data import AgentData
|
||||
|
||||
|
||||
class ProcessedAgentData(BaseModel):
|
||||
road_state: str
|
||||
agent_data: AgentData
|
||||
21
hub/app/interfaces/store_gateway.py
Normal file
21
hub/app/interfaces/store_gateway.py
Normal file
@ -0,0 +1,21 @@
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import List
|
||||
from app.entities.processed_agent_data import ProcessedAgentData
|
||||
|
||||
|
||||
class StoreGateway(ABC):
|
||||
"""
|
||||
Abstract class representing the Store Gateway interface.
|
||||
All store gateway adapters must implement these methods.
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def save_data(self, processed_agent_data_batch: List[ProcessedAgentData]) -> bool:
|
||||
"""
|
||||
Method to save the processed agent data in the database.
|
||||
Parameters:
|
||||
processed_agent_data_batch (ProcessedAgentData): The processed agent data to be saved.
|
||||
Returns:
|
||||
bool: True if the data is successfully saved, False otherwise.
|
||||
"""
|
||||
pass
|
||||
26
hub/config.py
Normal file
26
hub/config.py
Normal file
@ -0,0 +1,26 @@
|
||||
import os
|
||||
|
||||
|
||||
def try_parse_int(value: str):
|
||||
try:
|
||||
return int(value)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
# Configuration for the Store API
|
||||
STORE_API_HOST = os.environ.get("STORE_API_HOST") or "localhost"
|
||||
STORE_API_PORT = try_parse_int(os.environ.get("STORE_API_PORT")) or 8000
|
||||
STORE_API_BASE_URL = f"http://{STORE_API_HOST}:{STORE_API_PORT}"
|
||||
|
||||
# Configure for Redis
|
||||
REDIS_HOST = os.environ.get("REDIS_HOST") or "localhost"
|
||||
REDIS_PORT = try_parse_int(os.environ.get("REDIS_PORT")) or 6379
|
||||
|
||||
# Configure for hub logic
|
||||
BATCH_SIZE = try_parse_int(os.environ.get("BATCH_SIZE")) or 20
|
||||
|
||||
# MQTT
|
||||
MQTT_BROKER_HOST = os.environ.get("MQTT_BROKER_HOST") or "localhost"
|
||||
MQTT_BROKER_PORT = try_parse_int(os.environ.get("MQTT_BROKER_PORT")) or 1883
|
||||
MQTT_TOPIC = os.environ.get("MQTT_TOPIC") or "processed_agent_data_topic"
|
||||
11
hub/docker/db/structure.sql
Normal file
11
hub/docker/db/structure.sql
Normal file
@ -0,0 +1,11 @@
|
||||
CREATE TABLE processed_agent_data (
|
||||
id SERIAL PRIMARY KEY,
|
||||
road_state VARCHAR(255) NOT NULL,
|
||||
user_id INTEGER NOT NULL,
|
||||
x FLOAT,
|
||||
y FLOAT,
|
||||
z FLOAT,
|
||||
latitude FLOAT,
|
||||
longitude FLOAT,
|
||||
timestamp TIMESTAMP
|
||||
);
|
||||
111
hub/docker/docker-compose.yaml
Normal file
111
hub/docker/docker-compose.yaml
Normal file
@ -0,0 +1,111 @@
|
||||
version: "3.9"
|
||||
name: "road_vision__hub"
|
||||
services:
|
||||
mqtt:
|
||||
image: eclipse-mosquitto
|
||||
container_name: mqtt
|
||||
volumes:
|
||||
- ./mosquitto:/mosquitto
|
||||
- ./mosquitto/data:/mosquitto/data
|
||||
- ./mosquitto/log:/mosquitto/log
|
||||
ports:
|
||||
- 1883:1883
|
||||
- 9001:9001
|
||||
networks:
|
||||
mqtt_network:
|
||||
|
||||
|
||||
postgres_db:
|
||||
image: postgres:latest
|
||||
container_name: postgres_db
|
||||
restart: always
|
||||
environment:
|
||||
POSTGRES_USER: user
|
||||
POSTGRES_PASSWORD: pass
|
||||
POSTGRES_DB: test_db
|
||||
volumes:
|
||||
- postgres_data:/var/lib/postgresql/data
|
||||
- ./db/structure.sql:/docker-entrypoint-initdb.d/structure.sql
|
||||
ports:
|
||||
- "5432:5432"
|
||||
networks:
|
||||
db_network:
|
||||
|
||||
|
||||
pgadmin:
|
||||
container_name: pgadmin4
|
||||
image: dpage/pgadmin4
|
||||
restart: always
|
||||
environment:
|
||||
PGADMIN_DEFAULT_EMAIL: admin@admin.com
|
||||
PGADMIN_DEFAULT_PASSWORD: root
|
||||
volumes:
|
||||
- pgadmin-data:/var/lib/pgadmin
|
||||
ports:
|
||||
- "5050:80"
|
||||
networks:
|
||||
db_network:
|
||||
|
||||
|
||||
store:
|
||||
container_name: store
|
||||
build: ../../store
|
||||
depends_on:
|
||||
- postgres_db
|
||||
restart: always
|
||||
environment:
|
||||
POSTGRES_USER: user
|
||||
POSTGRES_PASSWORD: pass
|
||||
POSTGRES_DB: test_db
|
||||
POSTGRES_HOST: postgres_db
|
||||
POSTGRES_PORT: 5432
|
||||
ports:
|
||||
- "8000:8000"
|
||||
networks:
|
||||
db_network:
|
||||
hub_store:
|
||||
|
||||
|
||||
redis:
|
||||
image: redis:latest
|
||||
container_name: redis
|
||||
ports:
|
||||
- "6379:6379"
|
||||
networks:
|
||||
hub_redis:
|
||||
|
||||
|
||||
hub:
|
||||
container_name: hub
|
||||
build: ../
|
||||
depends_on:
|
||||
- mqtt
|
||||
- redis
|
||||
- store
|
||||
environment:
|
||||
STORE_API_HOST: "store"
|
||||
STORE_API_PORT: 8000
|
||||
REDIS_HOST: "redis"
|
||||
REDIS_PORT: 6379
|
||||
MQTT_BROKER_HOST: "mqtt"
|
||||
MQTT_BROKER_PORT: 1883
|
||||
MQTT_TOPIC: "processed_data_topic"
|
||||
BATCH_SIZE: 1
|
||||
ports:
|
||||
- "9000:8000"
|
||||
networks:
|
||||
mqtt_network:
|
||||
hub_store:
|
||||
hub_redis:
|
||||
|
||||
|
||||
networks:
|
||||
mqtt_network:
|
||||
db_network:
|
||||
hub_store:
|
||||
hub_redis:
|
||||
|
||||
|
||||
volumes:
|
||||
postgres_data:
|
||||
pgadmin-data:
|
||||
11
hub/docker/mosquitto/config/mosquitto.conf
Normal file
11
hub/docker/mosquitto/config/mosquitto.conf
Normal file
@ -0,0 +1,11 @@
|
||||
persistence true
|
||||
persistence_location /mosquitto/data/
|
||||
listener 1883
|
||||
## Authentication ##
|
||||
allow_anonymous true
|
||||
# allow_anonymous false
|
||||
# password_file /mosquitto/config/password.txt
|
||||
## Log ##
|
||||
log_dest file /mosquitto/log/mosquitto.log
|
||||
log_dest stdout
|
||||
# listener 1883
|
||||
96
hub/main.py
Normal file
96
hub/main.py
Normal file
@ -0,0 +1,96 @@
|
||||
import logging
|
||||
from typing import List
|
||||
|
||||
from fastapi import FastAPI
|
||||
from redis import Redis
|
||||
import paho.mqtt.client as mqtt
|
||||
|
||||
from app.adapters.store_api_adapter import StoreApiAdapter
|
||||
from app.entities.processed_agent_data import ProcessedAgentData
|
||||
from config import (
|
||||
STORE_API_BASE_URL,
|
||||
REDIS_HOST,
|
||||
REDIS_PORT,
|
||||
BATCH_SIZE,
|
||||
MQTT_TOPIC,
|
||||
MQTT_BROKER_HOST,
|
||||
MQTT_BROKER_PORT,
|
||||
)
|
||||
|
||||
# Configure logging settings
|
||||
logging.basicConfig(
|
||||
level=logging.INFO, # Set the log level to INFO (you can use logging.DEBUG for more detailed logs)
|
||||
format="[%(asctime)s] [%(levelname)s] [%(module)s] %(message)s",
|
||||
handlers=[
|
||||
logging.StreamHandler(), # Output log messages to the console
|
||||
logging.FileHandler("app.log"), # Save log messages to a file
|
||||
],
|
||||
)
|
||||
# Create an instance of the Redis using the configuration
|
||||
redis_client = Redis(host=REDIS_HOST, port=REDIS_PORT)
|
||||
# Create an instance of the StoreApiAdapter using the configuration
|
||||
store_adapter = StoreApiAdapter(api_base_url=STORE_API_BASE_URL)
|
||||
# Create an instance of the AgentMQTTAdapter using the configuration
|
||||
|
||||
# FastAPI
|
||||
app = FastAPI()
|
||||
|
||||
|
||||
@app.post("/processed_agent_data/")
|
||||
async def save_processed_agent_data(processed_agent_data: ProcessedAgentData):
|
||||
redis_client.lpush("processed_agent_data", processed_agent_data.model_dump_json())
|
||||
if redis_client.llen("processed_agent_data") >= BATCH_SIZE:
|
||||
processed_agent_data_batch: List[ProcessedAgentData] = []
|
||||
for _ in range(BATCH_SIZE):
|
||||
processed_agent_data = ProcessedAgentData.model_validate_json(
|
||||
redis_client.lpop("processed_agent_data")
|
||||
)
|
||||
processed_agent_data_batch.append(processed_agent_data)
|
||||
print(processed_agent_data_batch)
|
||||
store_adapter.save_data(processed_agent_data_batch=processed_agent_data_batch)
|
||||
return {"status": "ok"}
|
||||
|
||||
|
||||
# MQTT
|
||||
client = mqtt.Client()
|
||||
|
||||
|
||||
def on_connect(client, userdata, flags, rc):
|
||||
if rc == 0:
|
||||
logging.info("Connected to MQTT broker")
|
||||
client.subscribe(MQTT_TOPIC)
|
||||
else:
|
||||
logging.info(f"Failed to connect to MQTT broker with code: {rc}")
|
||||
|
||||
|
||||
def on_message(client, userdata, msg):
|
||||
try:
|
||||
payload: str = msg.payload.decode("utf-8")
|
||||
# Create ProcessedAgentData instance with the received data
|
||||
processed_agent_data = ProcessedAgentData.model_validate_json(
|
||||
payload, strict=True
|
||||
)
|
||||
|
||||
redis_client.lpush(
|
||||
"processed_agent_data", processed_agent_data.model_dump_json()
|
||||
)
|
||||
processed_agent_data_batch: List[ProcessedAgentData] = []
|
||||
if redis_client.llen("processed_agent_data") >= BATCH_SIZE:
|
||||
for _ in range(BATCH_SIZE):
|
||||
processed_agent_data = ProcessedAgentData.model_validate_json(
|
||||
redis_client.lpop("processed_agent_data")
|
||||
)
|
||||
processed_agent_data_batch.append(processed_agent_data)
|
||||
store_adapter.save_data(processed_agent_data_batch=processed_agent_data_batch)
|
||||
return {"status": "ok"}
|
||||
except Exception as e:
|
||||
logging.info(f"Error processing MQTT message: {e}")
|
||||
|
||||
|
||||
# Connect
|
||||
client.on_connect = on_connect
|
||||
client.on_message = on_message
|
||||
client.connect(MQTT_BROKER_HOST, MQTT_BROKER_PORT)
|
||||
|
||||
# Start
|
||||
client.loop_start()
|
||||
BIN
hub/requirements.txt
Normal file
BIN
hub/requirements.txt
Normal file
Binary file not shown.
60
hub/tests/test_agent_mqtt_adapter.py
Normal file
60
hub/tests/test_agent_mqtt_adapter.py
Normal file
@ -0,0 +1,60 @@
|
||||
import unittest
|
||||
from unittest.mock import Mock
|
||||
import redis
|
||||
from app.adapters.agent_mqtt_adapter import AgentMQTTAdapter
|
||||
from app.interfaces.store_gateway import StoreGateway
|
||||
from app.entities.agent_data import AccelerometerData, AgentData, GpsData
|
||||
from app.usecases.data_processing import process_agent_data_batch
|
||||
|
||||
class TestAgentMQTTAdapter(unittest.TestCase):
|
||||
def setUp(self):
|
||||
# Create a mock StoreGateway for testing
|
||||
self.mock_store_gateway = Mock(spec=StoreGateway)
|
||||
self.mock_redis = Mock(spec=redis.Redis)
|
||||
# Create the AgentMQTTAdapter instance with the mock StoreGateway
|
||||
self.agent_adapter = AgentMQTTAdapter(
|
||||
broker_host="test_broker",
|
||||
broker_port=1234,
|
||||
topic="test_topic",
|
||||
store_gateway=self.mock_store_gateway,
|
||||
redis_client=self.mock_redis,
|
||||
batch_size=1,
|
||||
)
|
||||
def test_on_message_valid_data(self):
|
||||
# Test handling of valid incoming MQTT message
|
||||
# (Assuming data is in the correct JSON format)
|
||||
valid_json_data = '{"user_id": 1,"accelerometer": {"x": 0.1, "y": 0.2, "z": 0.3}, "gps": {"latitude": 10.123, "longitude": 20.456}, "timestamp": "2023-07-21T12:34:56Z"}'
|
||||
mock_msg = Mock(payload=valid_json_data.encode("utf-8"))
|
||||
self.mock_redis.llen.return_value = 1
|
||||
self.mock_redis.rpop.return_value = valid_json_data
|
||||
# Call on_message with the mock message
|
||||
self.agent_adapter.on_message(None, None, mock_msg)
|
||||
# Ensure that the store_gateway's save_data method is called once with the correct arguments
|
||||
expected_agent_data = AgentData(
|
||||
user_id=1,
|
||||
accelerometer=AccelerometerData(
|
||||
x=0.1,
|
||||
y=0.2,
|
||||
z=0.3,
|
||||
),
|
||||
gps=GpsData(
|
||||
latitude=10.123,
|
||||
longitude=20.456,
|
||||
),
|
||||
timestamp="2023-07-21T12:34:56Z",
|
||||
)
|
||||
self.mock_store_gateway.save_data.assert_called_once_with(
|
||||
process_agent_data_batch([expected_agent_data])
|
||||
)
|
||||
def test_on_message_invalid_data(self):
|
||||
# Test handling of invalid incoming MQTT message
|
||||
# (Assuming data is missing required fields or has incorrect format)
|
||||
invalid_json_data = '{"user_id": 1, "accelerometer": {"x": 0.1, "y": 0.2}, "gps": {"latitude": 10.123}, "timestamp": 12345}'
|
||||
mock_msg = Mock(payload=invalid_json_data.encode("utf-8"))
|
||||
# Call on_message with the mock message
|
||||
self.agent_adapter.on_message(None, None, mock_msg)
|
||||
# Ensure that the store_gateway's save_data method is not called (due to invalid data)
|
||||
self.mock_store_gateway.save_data.assert_not_called()
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
72
hub/tests/test_store_api_adapter.py
Normal file
72
hub/tests/test_store_api_adapter.py
Normal file
@ -0,0 +1,72 @@
|
||||
import requests
|
||||
import unittest
|
||||
from unittest.mock import Mock, patch
|
||||
from app.adapters.store_api_adapter import StoreApiAdapter
|
||||
from app.entities.agent_data import AccelerometerData, AgentData, GpsData
|
||||
from app.entities.processed_agent_data import ProcessedAgentData
|
||||
|
||||
class TestStoreApiAdapter(unittest.TestCase):
|
||||
def setUp(self):
|
||||
# Create the StoreApiAdapter instance
|
||||
self.store_api_adapter = StoreApiAdapter(api_base_url="http://test-api.com")
|
||||
@patch.object(requests, "post")
|
||||
def test_save_data_success(self, mock_post):
|
||||
# Test successful saving of data to the Store API
|
||||
# Sample processed road data
|
||||
agent_data = AgentData(
|
||||
user_id=1,
|
||||
accelerometer=AccelerometerData(
|
||||
x=0.1,
|
||||
y=0.2,
|
||||
z=0.3,
|
||||
),
|
||||
gps=GpsData(
|
||||
latitude=10.123,
|
||||
longitude=20.456,
|
||||
),
|
||||
timestamp="2023-07-21T12:34:56Z",
|
||||
)
|
||||
processed_data = ProcessedAgentData(road_state="normal", agent_data=agent_data)
|
||||
# Mock the response from the Store API
|
||||
mock_response = Mock(status_code=201) # 201 indicates successful creation
|
||||
mock_post.return_value = mock_response
|
||||
# Call the save_data method
|
||||
result = self.store_api_adapter.save_data(processed_data)
|
||||
# Ensure that the post method of the mock is called with the correct arguments
|
||||
mock_post.assert_called_once_with(
|
||||
"http://test-api.com/agent_data", json=processed_data.model_dump()
|
||||
)
|
||||
# Ensure that the result is True, indicating successful saving
|
||||
self.assertTrue(result)
|
||||
@patch.object(requests, "post")
|
||||
def test_save_data_failure(self, mock_post):
|
||||
# Test failure to save data to the Store API
|
||||
# Sample processed road data
|
||||
agent_data = AgentData(
|
||||
user_id=1,
|
||||
accelerometer=AccelerometerData(
|
||||
x=0.1,
|
||||
y=0.2,
|
||||
z=0.3,
|
||||
),
|
||||
gps=GpsData(
|
||||
latitude=10.123,
|
||||
longitude=20.456,
|
||||
),
|
||||
timestamp="2023-07-21T12:34:56Z",
|
||||
)
|
||||
processed_data = ProcessedAgentData(road_state="normal", agent_data=agent_data)
|
||||
# Mock the response from the Store API
|
||||
mock_response = Mock(status_code=400) # 400 indicates a client error
|
||||
mock_post.return_value = mock_response
|
||||
# Call the save_data method
|
||||
result = self.store_api_adapter.save_data(processed_data)
|
||||
# Ensure that the post method of the mock is called with the correct arguments
|
||||
mock_post.assert_called_once_with(
|
||||
"http://test-api.com/agent_data", json=processed_data.model_dump()
|
||||
)
|
||||
# Ensure that the result is False, indicating failure to save
|
||||
self.assertFalse(result)
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Loading…
x
Reference in New Issue
Block a user