Compare commits

...

10 Commits

Author SHA1 Message Date
VladiusVostokus
b2c7427af0
Merge pull request #1 from Rhinemann/lab1_shved
updated compose file
2026-02-23 16:10:12 +00:00
e45faeb281 updated compose file 2026-02-22 11:22:19 +01:00
diana-tym
b8db2fe6ee refactor: change task 2024-03-13 15:14:48 +02:00
Toolf
0c5954f96c add: mapview datasource implementation 2024-02-26 15:16:47 +02:00
Toolf
3a4fa51aa9 fix: naming 2024-02-26 14:54:25 +02:00
Toolf
e32ba94adc add: MapView template 2024-02-26 14:53:24 +02:00
Toolf
93cc8d7378 fix: aggregated_data model 2024-02-16 15:49:19 +02:00
Toolf
b65670861d add: edge docker-compose.yaml 2024-02-15 16:04:22 +02:00
Toolf
9a179e09e9 add: edge template 2024-02-12 18:21:08 +02:00
Toolf
173a61d117 add: hub template 2024-02-12 18:18:38 +02:00
46 changed files with 1366 additions and 3 deletions

3
MapView/.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
.idea
venv
__pycache__

102
MapView/README.md Normal file
View File

@ -0,0 +1,102 @@
## Лабораторна робота №5
### Тема
Візуалізація якості стану дорожнього покриття за допомогою фреймворку Kivy.
### Мета
Розробити програму для візуалізації руху машини на дорозі та якості дороги за допомогою даних датчиків.
### Підготовка робочого середовище, встановлення проекту
Створення віртуального середовища
`python -m venv ./venv`
Активація середовища для linux
`source ./venv/bin/activate`
Активація середовища для windows
`venv\Scripts\activate`
Встановлення необхідних бібліотек
`pip install -r requirements.txt`
### Завдання
Для відображення мапи використовується віджет [Mapview](https://mapview.readthedocs.io/en/1.0.4/) для Kivy.
Для візуалізації руху машини можна використовувати MapMarker та рухати його відповідно GPS даних.
Для позначення нерівностей на дорозі також використовувати маркери. Зображення для маркерів можна знайти в папці images.
Для створення та редагування маршруту машини на мапі використовуйте клас LineMapLayer та функцію
`add_point()` з файлу lineMapLayer.py. Додавання лінії на мапу:
```python
map_layer = LineMapLayer()
mapview.add_layer(map_layer, mode="scatter")
```
Щоб створити затримку при відображенні руху машини можна використовувати
функцію `kivy.clock.Clock.schedule_once()` або `kivy.clock.Clock.schedule_interval()`.
Дані для відображення на мапі (координати та стан дороги) зчитуються з бази даних через вебсокет.
Для їх отримання використовуйте функцію `get_new_points()` з datasource.py.
**Шаблон основного файлу проєкту**
```python
from kivy.app import App
from kivy_garden.mapview import MapMarker, MapView
from kivy.clock import Clock
from lineMapLayer import LineMapLayer
class MapViewApp(App):
def __init__(self, **kwargs):
super().__init__()
# додати необхідні змінні
def on_start(self):
"""
Встановлює необхідні маркери, викликає функцію для оновлення мапи
"""
def update(self, *args):
"""
Викликається регулярно для оновлення мапи
"""
def update_car_marker(self, point):
"""
Оновлює відображення маркера машини на мапі
:param point: GPS координати
"""
def set_pothole_marker(self, point):
"""
Встановлює маркер для ями
:param point: GPS координати
"""
def set_bump_marker(self, point):
"""
Встановлює маркер для лежачого поліцейського
:param point: GPS координати
"""
def build(self):
"""
Ініціалізує мапу MapView(zoom, lat, lon)
:return: мапу
"""
self.mapview = MapView()
return self.mapview
if __name__ == '__main__':
MapViewApp().run()
```

4
MapView/config.py Normal file
View File

@ -0,0 +1,4 @@
import os
STORE_HOST = os.environ.get("STORE_HOST") or "localhost"
STORE_PORT = os.environ.get("STORE_PORT") or 8000

1
MapView/data.csv Normal file

File diff suppressed because one or more lines are too long

81
MapView/datasource.py Normal file
View File

@ -0,0 +1,81 @@
import asyncio
import json
from datetime import datetime
import websockets
from kivy import Logger
from pydantic import BaseModel, field_validator
from config import STORE_HOST, STORE_PORT
# Pydantic models
class ProcessedAgentData(BaseModel):
road_state: str
user_id: int
x: float
y: float
z: float
latitude: float
longitude: float
timestamp: datetime
@classmethod
@field_validator("timestamp", mode="before")
def check_timestamp(cls, value):
if isinstance(value, datetime):
return value
try:
return datetime.fromisoformat(value)
except (TypeError, ValueError):
raise ValueError(
"Invalid timestamp format. Expected ISO 8601 format (YYYY-MM-DDTHH:MM:SSZ)."
)
class Datasource:
def __init__(self, user_id: int):
self.index = 0
self.user_id = user_id
self.connection_status = None
self._new_points = []
asyncio.ensure_future(self.connect_to_server())
def get_new_points(self):
Logger.debug(self._new_points)
points = self._new_points
self._new_points = []
return points
async def connect_to_server(self):
uri = f"ws://{STORE_HOST}:{STORE_PORT}/ws/{self.user_id}"
while True:
Logger.debug("CONNECT TO SERVER")
async with websockets.connect(uri) as websocket:
self.connection_status = "Connected"
try:
while True:
data = await websocket.recv()
parsed_data = json.loads(data)
self.handle_received_data(parsed_data)
except websockets.ConnectionClosedOK:
self.connection_status = "Disconnected"
Logger.debug("SERVER DISCONNECT")
def handle_received_data(self, data):
# Update your UI or perform actions with received data here
Logger.debug(f"Received data: {data}")
processed_agent_data_list = sorted(
[
ProcessedAgentData(**processed_data_json)
for processed_data_json in json.loads(data)
],
key=lambda v: v.timestamp,
)
new_points = [
(
processed_agent_data.latitude,
processed_agent_data.longitude,
processed_agent_data.road_state,
)
for processed_agent_data in processed_agent_data_list
]
self._new_points.extend(new_points)

BIN
MapView/images/bump.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.4 KiB

BIN
MapView/images/car.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 4.0 KiB

BIN
MapView/images/pothole.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 4.3 KiB

147
MapView/lineMapLayer.py Normal file
View File

@ -0,0 +1,147 @@
from kivy_garden.mapview import MapLayer, MapMarker
from kivy.graphics import Color, Line
from kivy.graphics.context_instructions import Translate, Scale, PushMatrix, PopMatrix
from kivy_garden.mapview.utils import clamp
from kivy_garden.mapview.constants import (MIN_LONGITUDE, MAX_LONGITUDE, MIN_LATITUDE, MAX_LATITUDE)
from math import radians, log, tan, cos, pi
class LineMapLayer(MapLayer):
def __init__(self, coordinates=None, color=[0, 0, 1, 1], width=2, **kwargs):
super().__init__(**kwargs)
# if coordinates is None:
# coordinates = [[0, 0], [0, 0]]
self._coordinates = coordinates
self.color = color
self._line_points = None
self._line_points_offset = (0, 0)
self.zoom = 0
self.lon = 0
self.lat = 0
self.ms = 0
self._width = width
@property
def coordinates(self):
return self._coordinates
@coordinates.setter
def coordinates(self, coordinates):
self._coordinates = coordinates
self.invalidate_line_points()
self.clear_and_redraw()
def add_point(self, point):
if self._coordinates is None:
#self._coordinates = [point]
self._coordinates = []
self._coordinates.append(point)
# self._coordinates = [self._coordinates[-1], point]
self.invalidate_line_points()
self.clear_and_redraw()
@property
def line_points(self):
if self._line_points is None:
self.calc_line_points()
return self._line_points
@property
def line_points_offset(self):
if self._line_points is None:
self.calc_line_points()
return self._line_points_offset
def calc_line_points(self):
# Offset all points by the coordinates of the first point,
# to keep coordinates closer to zero.
# (and therefore avoid some float precision issues when drawing lines)
self._line_points_offset = (self.get_x(self.coordinates[0][1]),
self.get_y(self.coordinates[0][0]))
# Since lat is not a linear transform we must compute manually
self._line_points = [(self.get_x(lon) - self._line_points_offset[0],
self.get_y(lat) - self._line_points_offset[1])
for lat, lon in self.coordinates]
def invalidate_line_points(self):
self._line_points = None
self._line_points_offset = (0, 0)
def get_x(self, lon):
"""Get the x position on the map using this map source's projection
(0, 0) is located at the top left.
"""
return clamp(lon, MIN_LONGITUDE, MAX_LONGITUDE) * self.ms / 360.0
def get_y(self, lat):
"""Get the y position on the map using this map source's projection
(0, 0) is located at the top left.
"""
lat = radians(clamp(-lat, MIN_LATITUDE, MAX_LATITUDE))
return (1.0 - log(tan(lat) + 1.0 / cos(lat)) / pi) * self.ms / 2.0
# Function called when the MapView is moved
def reposition(self):
map_view = self.parent
# Must redraw when the zoom changes
# as the scatter transform resets for the new tiles
if self.zoom != map_view.zoom or \
self.lon != round(map_view.lon, 7) or \
self.lat != round(map_view.lat, 7):
map_source = map_view.map_source
self.ms = pow(2.0, map_view.zoom) * map_source.dp_tile_size
self.invalidate_line_points()
self.clear_and_redraw()
def clear_and_redraw(self, *args):
with self.canvas:
# Clear old line
self.canvas.clear()
self._draw_line()
def _draw_line(self, *args):
if self._coordinates is None:
return
map_view = self.parent
self.zoom = map_view.zoom
self.lon = map_view.lon
self.lat = map_view.lat
# When zooming we must undo the current scatter transform
# or the animation distorts it
scatter = map_view._scatter
sx, sy, ss = scatter.x, scatter.y, scatter.scale
# Account for map source tile size and map view zoom
vx, vy, vs = map_view.viewport_pos[0], map_view.viewport_pos[1], map_view.scale
with self.canvas:
self.opacity = 0.5
# Save the current coordinate space context
PushMatrix()
# Offset by the MapView's position in the window (always 0,0 ?)
Translate(*map_view.pos)
# Undo the scatter animation transform
Scale(1 / ss, 1 / ss, 1)
Translate(-sx, -sy)
# Apply the get window xy from transforms
Scale(vs, vs, 1)
Translate(-vx, -vy)
# Apply what we can factor out of the mapsource long, lat to x, y conversion
Translate(self.ms / 2, 0)
# Translate by the offset of the line points
# (this keeps the points closer to the origin)
Translate(*self.line_points_offset)
Color(*self.color)
Line(points=self.line_points, width=self._width)
# Retrieve the last saved coordinate space context
PopMatrix()

55
MapView/main.py Normal file
View File

@ -0,0 +1,55 @@
import asyncio
from kivy.app import App
from kivy_garden.mapview import MapMarker, MapView
from kivy.clock import Clock
from lineMapLayer import LineMapLayer
from datasource import Datasource
class MapViewApp(App):
def __init__(self, **kwargs):
super().__init__()
# додати необхідні змінні
def on_start(self):
"""
Встановлює необхідні маркери, викликає функцію для оновлення мапи
"""
def update(self, *args):
"""
Викликається регулярно для оновлення мапи
"""
def update_car_marker(self, point):
"""
Оновлює відображення маркера машини на мапі
:param point: GPS координати
"""
def set_pothole_marker(self, point):
"""
Встановлює маркер для ями
:param point: GPS координати
"""
def set_bump_marker(self, point):
"""
Встановлює маркер для лежачого поліцейського
:param point: GPS координати
"""
def build(self):
"""
Ініціалізує мапу MapView(zoom, lat, lon)
:return: мапу
"""
self.mapview = MapView()
return self.mapview
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(MapViewApp().async_run(async_lib="asyncio"))
loop.close()

15
MapView/requirements.txt Normal file
View File

@ -0,0 +1,15 @@
annotated-types==0.6.0
certifi==2024.2.2
charset-normalizer==3.3.2
docutils==0.20.1
idna==3.6
Kivy==2.3.0
Kivy-Garden==0.1.5
kivy-garden.mapview==1.0.6
pydantic==2.6.2
pydantic_core==2.16.3
Pygments==2.17.2
requests==2.31.0
typing_extensions==4.10.0
urllib3==2.2.1
websockets==12.0

View File

@ -1,4 +1,3 @@
version: "3.9"
name: "road_vision" name: "road_vision"
services: services:
mqtt: mqtt:

View File

@ -8,6 +8,7 @@ def try_parse(type, value: str):
return None return None
USER_ID = 1
# MQTT config # MQTT config
MQTT_BROKER_HOST = os.environ.get("MQTT_BROKER_HOST") or "mqtt" MQTT_BROKER_HOST = os.environ.get("MQTT_BROKER_HOST") or "mqtt"
MQTT_BROKER_PORT = try_parse(int, os.environ.get("MQTT_BROKER_PORT")) or 1883 MQTT_BROKER_PORT = try_parse(int, os.environ.get("MQTT_BROKER_PORT")) or 1883

View File

@ -9,4 +9,5 @@ from domain.gps import Gps
class AggregatedData: class AggregatedData:
accelerometer: Accelerometer accelerometer: Accelerometer
gps: Gps gps: Gps
time: datetime timestamp: datetime
user_id: int

View File

@ -1,6 +1,9 @@
from csv import reader from csv import reader
from datetime import datetime from datetime import datetime
from domain.accelerometer import Accelerometer
from domain.gps import Gps
from domain.aggregated_data import AggregatedData from domain.aggregated_data import AggregatedData
import config
class FileDatasource: class FileDatasource:
@ -13,6 +16,12 @@ class FileDatasource:
def read(self) -> AggregatedData: def read(self) -> AggregatedData:
"""Метод повертає дані отримані з датчиків""" """Метод повертає дані отримані з датчиків"""
return AggregatedData(
Accelerometer(1, 2, 3),
Gps(4, 5),
datetime.now(),
config.USER_ID,
)
def startReading(self, *args, **kwargs): def startReading(self, *args, **kwargs):
"""Метод повинен викликатись перед початком читання даних""" """Метод повинен викликатись перед початком читання даних"""

View File

@ -6,4 +6,5 @@ from schema.gps_schema import GpsSchema
class AggregatedDataSchema(Schema): class AggregatedDataSchema(Schema):
accelerometer = fields.Nested(AccelerometerSchema) accelerometer = fields.Nested(AccelerometerSchema)
gps = fields.Nested(GpsSchema) gps = fields.Nested(GpsSchema)
time = fields.DateTime("iso") timestamp = fields.DateTime("iso")
user_id = fields.Int()

2
edge/.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
venv
app.log

11
edge/Dockerfile Normal file
View File

@ -0,0 +1,11 @@
# Use the official Python image as the base image
FROM python:3.9-slim
# Set the working directory inside the container
WORKDIR /app
# Copy the requirements.txt file and install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy the entire application into the container
COPY . .
# Run the main.py script inside the container when it starts
CMD ["python", "main.py"]

View File

@ -0,0 +1,76 @@
import logging
import paho.mqtt.client as mqtt
from app.interfaces.agent_gateway import AgentGateway
from app.entities.agent_data import AgentData, GpsData
from app.usecases.data_processing import process_agent_data
from app.interfaces.hub_gateway import HubGateway
class AgentMQTTAdapter(AgentGateway):
def __init__(
self,
broker_host,
broker_port,
topic,
hub_gateway: HubGateway,
batch_size=10,
):
self.batch_size = batch_size
# MQTT
self.broker_host = broker_host
self.broker_port = broker_port
self.topic = topic
self.client = mqtt.Client()
# Hub
self.hub_gateway = hub_gateway
def on_connect(self, client, userdata, flags, rc):
if rc == 0:
logging.info("Connected to MQTT broker")
self.client.subscribe(self.topic)
else:
logging.info(f"Failed to connect to MQTT broker with code: {rc}")
def on_message(self, client, userdata, msg):
"""Processing agent data and sent it to hub gateway"""
try:
payload: str = msg.payload.decode("utf-8")
# Create AgentData instance with the received data
agent_data = AgentData.model_validate_json(payload, strict=True)
# Process the received data (you can call a use case here if needed)
processed_data = process_agent_data(agent_data)
# Store the agent_data in the database (you can send it to the data processing module)
if not self.hub_gateway.save_data(processed_data):
logging.error("Hub is not available")
except Exception as e:
logging.info(f"Error processing MQTT message: {e}")
def connect(self):
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
self.client.connect(self.broker_host, self.broker_port, 60)
def start(self):
self.client.loop_start()
def stop(self):
self.client.loop_stop()
# Usage example:
if __name__ == "__main__":
broker_host = "localhost"
broker_port = 1883
topic = "agent_data_topic"
# Assuming you have implemented the StoreGateway and passed it to the adapter
store_gateway = HubGateway()
adapter = AgentMQTTAdapter(broker_host, broker_port, topic, store_gateway)
adapter.connect()
adapter.start()
try:
# Keep the adapter running in the background
while True:
pass
except KeyboardInterrupt:
adapter.stop()
logging.info("Adapter stopped.")

View File

@ -0,0 +1,29 @@
import logging
import requests as requests
from app.entities.processed_agent_data import ProcessedAgentData
from app.interfaces.hub_gateway import HubGateway
class HubHttpAdapter(HubGateway):
def __init__(self, api_base_url):
self.api_base_url = api_base_url
def save_data(self, processed_data: ProcessedAgentData):
"""
Save the processed road data to the Hub.
Parameters:
processed_data (ProcessedAgentData): Processed road data to be saved.
Returns:
bool: True if the data is successfully saved, False otherwise.
"""
url = f"{self.api_base_url}/processed_agent_data/"
response = requests.post(url, data=processed_data.model_dump_json())
if response.status_code != 200:
logging.info(
f"Invalid Hub response\nData: {processed_data.model_dump_json()}\nResponse: {response}"
)
return False
return True

View File

@ -0,0 +1,50 @@
import logging
import requests as requests
from paho.mqtt import client as mqtt_client
from app.entities.processed_agent_data import ProcessedAgentData
from app.interfaces.hub_gateway import HubGateway
class HubMqttAdapter(HubGateway):
def __init__(self, broker, port, topic):
self.broker = broker
self.port = port
self.topic = topic
self.mqtt_client = self._connect_mqtt(broker, port)
def save_data(self, processed_data: ProcessedAgentData):
"""
Save the processed road data to the Hub.
Parameters:
processed_data (ProcessedAgentData): Processed road data to be saved.
Returns:
bool: True if the data is successfully saved, False otherwise.
"""
msg = processed_data.model_dump_json()
result = self.mqtt_client.publish(self.topic, msg)
status = result[0]
if status == 0:
return True
else:
print(f"Failed to send message to topic {self.topic}")
return False
@staticmethod
def _connect_mqtt(broker, port):
"""Create MQTT client"""
print(f"CONNECT TO {broker}:{port}")
def on_connect(client, userdata, flags, rc):
if rc == 0:
print(f"Connected to MQTT Broker ({broker}:{port})!")
else:
print("Failed to connect {broker}:{port}, return code %d\n", rc)
exit(rc) # Stop execution
client = mqtt_client.Client()
client.on_connect = on_connect
client.connect(broker, port)
client.loop_start()
return client

View File

@ -0,0 +1,32 @@
from datetime import datetime
from pydantic import BaseModel, field_validator
class AccelerometerData(BaseModel):
x: float
y: float
z: float
class GpsData(BaseModel):
latitude: float
longitude: float
class AgentData(BaseModel):
accelerometer: AccelerometerData
gps: GpsData
timestamp: datetime
@classmethod
@field_validator("timestamp", mode="before")
def parse_timestamp(cls, value):
# Convert the timestamp to a datetime object
if isinstance(value, datetime):
return value
try:
return datetime.fromisoformat(value)
except (TypeError, ValueError):
raise ValueError(
"Invalid timestamp format. Expected ISO 8601 format (YYYY-MM-DDTHH:MM:SSZ)."
)

View File

@ -0,0 +1,7 @@
from pydantic import BaseModel
from app.entities.agent_data import AgentData
class ProcessedAgentData(BaseModel):
road_state: str
agent_data: AgentData

View File

@ -0,0 +1,40 @@
from abc import ABC, abstractmethod
class AgentGateway(ABC):
"""
Abstract class representing the Agent Gateway interface.
All agent gateway adapters must implement these methods.
"""
@abstractmethod
def on_message(self, client, userdata, msg):
"""
Method to handle incoming messages from the agent.
Parameters:
client: MQTT client instance.
userdata: Any additional user data passed to the MQTT client.
msg: The MQTT message received from the agent.
"""
pass
@abstractmethod
def connect(self):
"""
Method to establish a connection to the agent.
"""
pass
@abstractmethod
def start(self):
"""
Method to start listening for messages from the agent.
"""
pass
@abstractmethod
def stop(self):
"""
Method to stop the agent gateway and clean up resources.
"""
pass

View File

@ -0,0 +1,20 @@
from abc import ABC, abstractmethod
from app.entities.processed_agent_data import ProcessedAgentData
class HubGateway(ABC):
"""
Abstract class representing the Store Gateway interface.
All store gateway adapters must implement these methods.
"""
@abstractmethod
def save_data(self, processed_data: ProcessedAgentData) -> bool:
"""
Method to save the processed agent data in the database.
Parameters:
processed_data (ProcessedAgentData): The processed agent data to be saved.
Returns:
bool: True if the data is successfully saved, False otherwise.
"""
pass

View File

@ -0,0 +1,15 @@
from app.entities.agent_data import AgentData
from app.entities.processed_agent_data import ProcessedAgentData
def process_agent_data(
agent_data: AgentData,
) -> ProcessedAgentData:
"""
Process agent data and classify the state of the road surface.
Parameters:
agent_data (AgentData): Agent data that containing accelerometer, GPS, and timestamp.
Returns:
processed_data_batch (ProcessedAgentData): Processed data containing the classified state of the road surface and agent data.
"""
# Implement it

24
edge/config.py Normal file
View File

@ -0,0 +1,24 @@
import os
def try_parse_int(value: str):
try:
return int(value)
except Exception:
return None
# Configuration for agent MQTT
MQTT_BROKER_HOST = os.environ.get("MQTT_BROKER_HOST") or "localhost"
MQTT_BROKER_PORT = try_parse_int(os.environ.get("MQTT_BROKER_PORT")) or 1883
MQTT_TOPIC = os.environ.get("MQTT_TOPIC") or "agent_data_topic"
# Configuration for hub MQTT
HUB_MQTT_BROKER_HOST = os.environ.get("HUB_MQTT_BROKER_HOST") or "localhost"
HUB_MQTT_BROKER_PORT = try_parse_int(os.environ.get("HUB_MQTT_BROKER_PORT")) or 1883
HUB_MQTT_TOPIC = os.environ.get("HUB_MQTT_TOPIC") or "processed_agent_data_topic"
# Configuration for the Hub
HUB_HOST = os.environ.get("HUB_HOST") or "localhost"
HUB_PORT = try_parse_int(os.environ.get("HUB_PORT")) or 12000
HUB_URL = f"http://{HUB_HOST}:{HUB_PORT}"

View File

@ -0,0 +1,48 @@
version: "3.9"
# name: "road_vision"
services:
mqtt:
image: eclipse-mosquitto
container_name: mqtt
volumes:
- ./mosquitto:/mosquitto
- ./mosquitto/data:/mosquitto/data
- ./mosquitto/log:/mosquitto/log
ports:
- 1883:1883
- 19001:9001
networks:
mqtt_network:
edge:
container_name: edge
build: ../
depends_on:
- mqtt
environment:
MQTT_BROKER_HOST: "mqtt"
MQTT_BROKER_PORT: 1883
MQTT_TOPIC: " "
HUB_HOST: "store"
HUB_PORT: 8000
HUB_MQTT_BROKER_HOST: "mqtt"
HUB_MQTT_BROKER_PORT: 1883
HUB_MQTT_TOPIC: "processed_data_topic"
networks:
mqtt_network:
edge_hub:
networks:
mqtt_network:
db_network:
edge_hub:
hub:
hub_store:
hub_redis:
volumes:
postgres_data:
pgadmin-data:

View File

@ -0,0 +1,11 @@
persistence true
persistence_location /mosquitto/data/
listener 1883
## Authentication ##
allow_anonymous true
# allow_anonymous false
# password_file /mosquitto/config/password.txt
## Log ##
log_dest file /mosquitto/log/mosquitto.log
log_dest stdout
# listener 1883

51
edge/main.py Normal file
View File

@ -0,0 +1,51 @@
import logging
from app.adapters.agent_mqtt_adapter import AgentMQTTAdapter
from app.adapters.hub_http_adapter import HubHttpAdapter
from app.adapters.hub_mqtt_adapter import HubMqttAdapter
from config import (
MQTT_BROKER_HOST,
MQTT_BROKER_PORT,
MQTT_TOPIC,
HUB_URL,
HUB_MQTT_BROKER_HOST,
HUB_MQTT_BROKER_PORT,
HUB_MQTT_TOPIC,
)
if __name__ == "__main__":
# Configure logging settings
logging.basicConfig(
level=logging.INFO, # Set the log level to INFO (you can use logging.DEBUG for more detailed logs)
format="[%(asctime)s] [%(levelname)s] [%(module)s] %(message)s",
handlers=[
logging.StreamHandler(), # Output log messages to the console
logging.FileHandler("app.log"), # Save log messages to a file
],
)
# Create an instance of the StoreApiAdapter using the configuration
# hub_adapter = HubHttpAdapter(
# api_base_url=HUB_URL,
# )
hub_adapter = HubMqttAdapter(
broker=HUB_MQTT_BROKER_HOST,
port=HUB_MQTT_BROKER_PORT,
topic=HUB_MQTT_TOPIC,
)
# Create an instance of the AgentMQTTAdapter using the configuration
agent_adapter = AgentMQTTAdapter(
broker_host=MQTT_BROKER_HOST,
broker_port=MQTT_BROKER_PORT,
topic=MQTT_TOPIC,
hub_gateway=hub_adapter,
)
try:
# Connect to the MQTT broker and start listening for messages
agent_adapter.connect()
agent_adapter.start()
# Keep the system running indefinitely (you can add other logic as needed)
while True:
pass
except KeyboardInterrupt:
# Stop the MQTT adapter and exit gracefully if interrupted by the user
agent_adapter.stop()
logging.info("System stopped.")

10
edge/requirements.txt Normal file
View File

@ -0,0 +1,10 @@
annotated-types==0.6.0
certifi==2024.2.2
charset-normalizer==3.3.2
idna==3.6
paho-mqtt==1.6.1
pydantic==2.6.1
pydantic_core==2.16.2
requests==2.31.0
typing_extensions==4.9.0
urllib3==2.2.0

2
hub/.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
venv
__pycache__

11
hub/Dockerfile Normal file
View File

@ -0,0 +1,11 @@
# Use the official Python image as the base image
FROM python:3.9-slim
# Set the working directory inside the container
WORKDIR /app
# Copy the requirements.txt file and install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy the entire application into the container
COPY . .
# Run the main.py script inside the container when it starts
CMD ["uvicorn", "main:app", "--host", "0.0.0.0"]

33
hub/README.md Normal file
View File

@ -0,0 +1,33 @@
# Hub
## Instructions for Starting the Project
To start the Hub, follow these steps:
1. Clone the repository to your local machine:
```bash
git clone https://github.com/Toolf/hub.git
cd hub
```
2. Create and activate a virtual environment (optional but recommended):
```bash
python -m venv venv
source venv/bin/activate # On Windows, use: venv\Scripts\activate
```
3. Install the project dependencies:
```bash
pip install -r requirements.txt
```
4. Run the system:
```bash
python ./app/main.py
```
The system will start collecting data from the agent through MQTT and processing it.
## Running Tests
To run tests for the project, use the following command:
```bash
python -m unittest discover tests
```
## Common Commands
### 1. Saving Requirements
To save the project dependencies to the requirements.txt file:
```bash
pip freeze > requirements.txt
```

View File

@ -0,0 +1,24 @@
import json
import logging
from typing import List
import pydantic_core
import requests
from app.entities.processed_agent_data import ProcessedAgentData
from app.interfaces.store_gateway import StoreGateway
class StoreApiAdapter(StoreGateway):
def __init__(self, api_base_url):
self.api_base_url = api_base_url
def save_data(self, processed_agent_data_batch: List[ProcessedAgentData]):
"""
Save the processed road data to the Store API.
Parameters:
processed_agent_data_batch (dict): Processed road data to be saved.
Returns:
bool: True if the data is successfully saved, False otherwise.
"""
# Implement it

View File

@ -0,0 +1,33 @@
from datetime import datetime
from pydantic import BaseModel, field_validator
class AccelerometerData(BaseModel):
x: float
y: float
z: float
class GpsData(BaseModel):
latitude: float
longitude: float
class AgentData(BaseModel):
user_id: int
accelerometer: AccelerometerData
gps: GpsData
timestamp: datetime
@classmethod
@field_validator('timestamp', mode='before')
def parse_timestamp(cls, value):
# Convert the timestamp to a datetime object
if isinstance(value, datetime):
return value
try:
return datetime.fromisoformat(value)
except (TypeError, ValueError):
raise ValueError(
"Invalid timestamp format. Expected ISO 8601 format (YYYY-MM-DDTHH:MM:SSZ)."
)

View File

@ -0,0 +1,7 @@
from pydantic import BaseModel
from app.entities.agent_data import AgentData
class ProcessedAgentData(BaseModel):
road_state: str
agent_data: AgentData

View File

@ -0,0 +1,21 @@
from abc import ABC, abstractmethod
from typing import List
from app.entities.processed_agent_data import ProcessedAgentData
class StoreGateway(ABC):
"""
Abstract class representing the Store Gateway interface.
All store gateway adapters must implement these methods.
"""
@abstractmethod
def save_data(self, processed_agent_data_batch: List[ProcessedAgentData]) -> bool:
"""
Method to save the processed agent data in the database.
Parameters:
processed_agent_data_batch (ProcessedAgentData): The processed agent data to be saved.
Returns:
bool: True if the data is successfully saved, False otherwise.
"""
pass

26
hub/config.py Normal file
View File

@ -0,0 +1,26 @@
import os
def try_parse_int(value: str):
try:
return int(value)
except Exception:
return None
# Configuration for the Store API
STORE_API_HOST = os.environ.get("STORE_API_HOST") or "localhost"
STORE_API_PORT = try_parse_int(os.environ.get("STORE_API_PORT")) or 8000
STORE_API_BASE_URL = f"http://{STORE_API_HOST}:{STORE_API_PORT}"
# Configure for Redis
REDIS_HOST = os.environ.get("REDIS_HOST") or "localhost"
REDIS_PORT = try_parse_int(os.environ.get("REDIS_PORT")) or 6379
# Configure for hub logic
BATCH_SIZE = try_parse_int(os.environ.get("BATCH_SIZE")) or 20
# MQTT
MQTT_BROKER_HOST = os.environ.get("MQTT_BROKER_HOST") or "localhost"
MQTT_BROKER_PORT = try_parse_int(os.environ.get("MQTT_BROKER_PORT")) or 1883
MQTT_TOPIC = os.environ.get("MQTT_TOPIC") or "processed_agent_data_topic"

View File

@ -0,0 +1,11 @@
CREATE TABLE processed_agent_data (
id SERIAL PRIMARY KEY,
road_state VARCHAR(255) NOT NULL,
user_id INTEGER NOT NULL,
x FLOAT,
y FLOAT,
z FLOAT,
latitude FLOAT,
longitude FLOAT,
timestamp TIMESTAMP
);

View File

@ -0,0 +1,111 @@
version: "3.9"
name: "road_vision__hub"
services:
mqtt:
image: eclipse-mosquitto
container_name: mqtt
volumes:
- ./mosquitto:/mosquitto
- ./mosquitto/data:/mosquitto/data
- ./mosquitto/log:/mosquitto/log
ports:
- 1883:1883
- 9001:9001
networks:
mqtt_network:
postgres_db:
image: postgres:latest
container_name: postgres_db
restart: always
environment:
POSTGRES_USER: user
POSTGRES_PASSWORD: pass
POSTGRES_DB: test_db
volumes:
- postgres_data:/var/lib/postgresql/data
- ./db/structure.sql:/docker-entrypoint-initdb.d/structure.sql
ports:
- "5432:5432"
networks:
db_network:
pgadmin:
container_name: pgadmin4
image: dpage/pgadmin4
restart: always
environment:
PGADMIN_DEFAULT_EMAIL: admin@admin.com
PGADMIN_DEFAULT_PASSWORD: root
volumes:
- pgadmin-data:/var/lib/pgadmin
ports:
- "5050:80"
networks:
db_network:
store:
container_name: store
build: ../../store
depends_on:
- postgres_db
restart: always
environment:
POSTGRES_USER: user
POSTGRES_PASSWORD: pass
POSTGRES_DB: test_db
POSTGRES_HOST: postgres_db
POSTGRES_PORT: 5432
ports:
- "8000:8000"
networks:
db_network:
hub_store:
redis:
image: redis:latest
container_name: redis
ports:
- "6379:6379"
networks:
hub_redis:
hub:
container_name: hub
build: ../
depends_on:
- mqtt
- redis
- store
environment:
STORE_API_HOST: "store"
STORE_API_PORT: 8000
REDIS_HOST: "redis"
REDIS_PORT: 6379
MQTT_BROKER_HOST: "mqtt"
MQTT_BROKER_PORT: 1883
MQTT_TOPIC: "processed_data_topic"
BATCH_SIZE: 1
ports:
- "9000:8000"
networks:
mqtt_network:
hub_store:
hub_redis:
networks:
mqtt_network:
db_network:
hub_store:
hub_redis:
volumes:
postgres_data:
pgadmin-data:

View File

@ -0,0 +1,11 @@
persistence true
persistence_location /mosquitto/data/
listener 1883
## Authentication ##
allow_anonymous true
# allow_anonymous false
# password_file /mosquitto/config/password.txt
## Log ##
log_dest file /mosquitto/log/mosquitto.log
log_dest stdout
# listener 1883

96
hub/main.py Normal file
View File

@ -0,0 +1,96 @@
import logging
from typing import List
from fastapi import FastAPI
from redis import Redis
import paho.mqtt.client as mqtt
from app.adapters.store_api_adapter import StoreApiAdapter
from app.entities.processed_agent_data import ProcessedAgentData
from config import (
STORE_API_BASE_URL,
REDIS_HOST,
REDIS_PORT,
BATCH_SIZE,
MQTT_TOPIC,
MQTT_BROKER_HOST,
MQTT_BROKER_PORT,
)
# Configure logging settings
logging.basicConfig(
level=logging.INFO, # Set the log level to INFO (you can use logging.DEBUG for more detailed logs)
format="[%(asctime)s] [%(levelname)s] [%(module)s] %(message)s",
handlers=[
logging.StreamHandler(), # Output log messages to the console
logging.FileHandler("app.log"), # Save log messages to a file
],
)
# Create an instance of the Redis using the configuration
redis_client = Redis(host=REDIS_HOST, port=REDIS_PORT)
# Create an instance of the StoreApiAdapter using the configuration
store_adapter = StoreApiAdapter(api_base_url=STORE_API_BASE_URL)
# Create an instance of the AgentMQTTAdapter using the configuration
# FastAPI
app = FastAPI()
@app.post("/processed_agent_data/")
async def save_processed_agent_data(processed_agent_data: ProcessedAgentData):
redis_client.lpush("processed_agent_data", processed_agent_data.model_dump_json())
if redis_client.llen("processed_agent_data") >= BATCH_SIZE:
processed_agent_data_batch: List[ProcessedAgentData] = []
for _ in range(BATCH_SIZE):
processed_agent_data = ProcessedAgentData.model_validate_json(
redis_client.lpop("processed_agent_data")
)
processed_agent_data_batch.append(processed_agent_data)
print(processed_agent_data_batch)
store_adapter.save_data(processed_agent_data_batch=processed_agent_data_batch)
return {"status": "ok"}
# MQTT
client = mqtt.Client()
def on_connect(client, userdata, flags, rc):
if rc == 0:
logging.info("Connected to MQTT broker")
client.subscribe(MQTT_TOPIC)
else:
logging.info(f"Failed to connect to MQTT broker with code: {rc}")
def on_message(client, userdata, msg):
try:
payload: str = msg.payload.decode("utf-8")
# Create ProcessedAgentData instance with the received data
processed_agent_data = ProcessedAgentData.model_validate_json(
payload, strict=True
)
redis_client.lpush(
"processed_agent_data", processed_agent_data.model_dump_json()
)
processed_agent_data_batch: List[ProcessedAgentData] = []
if redis_client.llen("processed_agent_data") >= BATCH_SIZE:
for _ in range(BATCH_SIZE):
processed_agent_data = ProcessedAgentData.model_validate_json(
redis_client.lpop("processed_agent_data")
)
processed_agent_data_batch.append(processed_agent_data)
store_adapter.save_data(processed_agent_data_batch=processed_agent_data_batch)
return {"status": "ok"}
except Exception as e:
logging.info(f"Error processing MQTT message: {e}")
# Connect
client.on_connect = on_connect
client.on_message = on_message
client.connect(MQTT_BROKER_HOST, MQTT_BROKER_PORT)
# Start
client.loop_start()

BIN
hub/requirements.txt Normal file

Binary file not shown.

View File

@ -0,0 +1,60 @@
import unittest
from unittest.mock import Mock
import redis
from app.adapters.agent_mqtt_adapter import AgentMQTTAdapter
from app.interfaces.store_gateway import StoreGateway
from app.entities.agent_data import AccelerometerData, AgentData, GpsData
from app.usecases.data_processing import process_agent_data_batch
class TestAgentMQTTAdapter(unittest.TestCase):
def setUp(self):
# Create a mock StoreGateway for testing
self.mock_store_gateway = Mock(spec=StoreGateway)
self.mock_redis = Mock(spec=redis.Redis)
# Create the AgentMQTTAdapter instance with the mock StoreGateway
self.agent_adapter = AgentMQTTAdapter(
broker_host="test_broker",
broker_port=1234,
topic="test_topic",
store_gateway=self.mock_store_gateway,
redis_client=self.mock_redis,
batch_size=1,
)
def test_on_message_valid_data(self):
# Test handling of valid incoming MQTT message
# (Assuming data is in the correct JSON format)
valid_json_data = '{"user_id": 1,"accelerometer": {"x": 0.1, "y": 0.2, "z": 0.3}, "gps": {"latitude": 10.123, "longitude": 20.456}, "timestamp": "2023-07-21T12:34:56Z"}'
mock_msg = Mock(payload=valid_json_data.encode("utf-8"))
self.mock_redis.llen.return_value = 1
self.mock_redis.rpop.return_value = valid_json_data
# Call on_message with the mock message
self.agent_adapter.on_message(None, None, mock_msg)
# Ensure that the store_gateway's save_data method is called once with the correct arguments
expected_agent_data = AgentData(
user_id=1,
accelerometer=AccelerometerData(
x=0.1,
y=0.2,
z=0.3,
),
gps=GpsData(
latitude=10.123,
longitude=20.456,
),
timestamp="2023-07-21T12:34:56Z",
)
self.mock_store_gateway.save_data.assert_called_once_with(
process_agent_data_batch([expected_agent_data])
)
def test_on_message_invalid_data(self):
# Test handling of invalid incoming MQTT message
# (Assuming data is missing required fields or has incorrect format)
invalid_json_data = '{"user_id": 1, "accelerometer": {"x": 0.1, "y": 0.2}, "gps": {"latitude": 10.123}, "timestamp": 12345}'
mock_msg = Mock(payload=invalid_json_data.encode("utf-8"))
# Call on_message with the mock message
self.agent_adapter.on_message(None, None, mock_msg)
# Ensure that the store_gateway's save_data method is not called (due to invalid data)
self.mock_store_gateway.save_data.assert_not_called()
if __name__ == "__main__":
unittest.main()

View File

@ -0,0 +1,72 @@
import requests
import unittest
from unittest.mock import Mock, patch
from app.adapters.store_api_adapter import StoreApiAdapter
from app.entities.agent_data import AccelerometerData, AgentData, GpsData
from app.entities.processed_agent_data import ProcessedAgentData
class TestStoreApiAdapter(unittest.TestCase):
def setUp(self):
# Create the StoreApiAdapter instance
self.store_api_adapter = StoreApiAdapter(api_base_url="http://test-api.com")
@patch.object(requests, "post")
def test_save_data_success(self, mock_post):
# Test successful saving of data to the Store API
# Sample processed road data
agent_data = AgentData(
user_id=1,
accelerometer=AccelerometerData(
x=0.1,
y=0.2,
z=0.3,
),
gps=GpsData(
latitude=10.123,
longitude=20.456,
),
timestamp="2023-07-21T12:34:56Z",
)
processed_data = ProcessedAgentData(road_state="normal", agent_data=agent_data)
# Mock the response from the Store API
mock_response = Mock(status_code=201) # 201 indicates successful creation
mock_post.return_value = mock_response
# Call the save_data method
result = self.store_api_adapter.save_data(processed_data)
# Ensure that the post method of the mock is called with the correct arguments
mock_post.assert_called_once_with(
"http://test-api.com/agent_data", json=processed_data.model_dump()
)
# Ensure that the result is True, indicating successful saving
self.assertTrue(result)
@patch.object(requests, "post")
def test_save_data_failure(self, mock_post):
# Test failure to save data to the Store API
# Sample processed road data
agent_data = AgentData(
user_id=1,
accelerometer=AccelerometerData(
x=0.1,
y=0.2,
z=0.3,
),
gps=GpsData(
latitude=10.123,
longitude=20.456,
),
timestamp="2023-07-21T12:34:56Z",
)
processed_data = ProcessedAgentData(road_state="normal", agent_data=agent_data)
# Mock the response from the Store API
mock_response = Mock(status_code=400) # 400 indicates a client error
mock_post.return_value = mock_response
# Call the save_data method
result = self.store_api_adapter.save_data(processed_data)
# Ensure that the post method of the mock is called with the correct arguments
mock_post.assert_called_once_with(
"http://test-api.com/agent_data", json=processed_data.model_dump()
)
# Ensure that the result is False, indicating failure to save
self.assertFalse(result)
if __name__ == "__main__":
unittest.main()