Compare commits
24 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 564fe329f3 | |||
| 75613fd4fc | |||
| a25fbfc3ef | |||
| ca790e7306 | |||
| 1643767094 | |||
| 3d94bf3008 | |||
| 29196ba400 | |||
| 72e9f65b27 | |||
| 2b6bed70d8 | |||
| 21ee14ccd1 | |||
| c5d98d53cd | |||
| 07a0e906d8 | |||
| b330180909 | |||
| c974ac32f6 | |||
| b2c7427af0 | |||
| e45faeb281 | |||
| b8db2fe6ee | |||
| 0c5954f96c | |||
| 3a4fa51aa9 | |||
| e32ba94adc | |||
| 93cc8d7378 | |||
| b65670861d | |||
| 9a179e09e9 | |||
| 173a61d117 |
@@ -0,0 +1,25 @@
|
|||||||
|
name: Test Agent
|
||||||
|
on: [push, workflow_dispatch]
|
||||||
|
jobs:
|
||||||
|
test-agent-run:
|
||||||
|
runs-on: arch-x86_64
|
||||||
|
steps:
|
||||||
|
- name: Fetch the repository
|
||||||
|
run: git clone --branch ${{ gitea.ref_name }} --depth 1 ${{ gitea.server_url }}/${{ gitea.repository }}
|
||||||
|
|
||||||
|
- name: Build containers
|
||||||
|
run: docker-compose -f docker-compose-test.yaml build
|
||||||
|
working-directory: sem8-iot-test/agent/docker
|
||||||
|
|
||||||
|
- name: Start MQTT broker
|
||||||
|
run: docker-compose -f docker-compose-test.yaml up -d mqtt
|
||||||
|
working-directory: sem8-iot-test/agent/docker
|
||||||
|
|
||||||
|
- name: Start agent
|
||||||
|
run: docker-compose -f docker-compose-test.yaml run fake_agent
|
||||||
|
working-directory: sem8-iot-test/agent/docker
|
||||||
|
|
||||||
|
- name: Clean up
|
||||||
|
if: always()
|
||||||
|
run: docker-compose -f docker-compose-test.yaml down
|
||||||
|
working-directory: sem8-iot-test/agent/docker
|
||||||
@@ -0,0 +1,2 @@
|
|||||||
|
agent/docker/mosquitto/data/
|
||||||
|
agent/docker/mosquitto/log/
|
||||||
@@ -0,0 +1,3 @@
|
|||||||
|
.idea
|
||||||
|
venv
|
||||||
|
__pycache__
|
||||||
@@ -0,0 +1,102 @@
|
|||||||
|
## Лабораторна робота №5
|
||||||
|
### Тема
|
||||||
|
|
||||||
|
Візуалізація якості стану дорожнього покриття за допомогою фреймворку Kivy.
|
||||||
|
|
||||||
|
### Мета
|
||||||
|
|
||||||
|
Розробити програму для візуалізації руху машини на дорозі та якості дороги за допомогою даних датчиків.
|
||||||
|
|
||||||
|
### Підготовка робочого середовище, встановлення проекту
|
||||||
|
|
||||||
|
Створення віртуального середовища
|
||||||
|
|
||||||
|
`python -m venv ./venv`
|
||||||
|
|
||||||
|
Активація середовища для linux
|
||||||
|
|
||||||
|
`source ./venv/bin/activate`
|
||||||
|
|
||||||
|
Активація середовища для windows
|
||||||
|
|
||||||
|
`venv\Scripts\activate`
|
||||||
|
|
||||||
|
Встановлення необхідних бібліотек
|
||||||
|
|
||||||
|
`pip install -r requirements.txt`
|
||||||
|
|
||||||
|
### Завдання
|
||||||
|
|
||||||
|
Для відображення мапи використовується віджет [Mapview](https://mapview.readthedocs.io/en/1.0.4/) для Kivy.
|
||||||
|
|
||||||
|
Для візуалізації руху машини можна використовувати MapMarker та рухати його відповідно GPS даних.
|
||||||
|
Для позначення нерівностей на дорозі також використовувати маркери. Зображення для маркерів можна знайти в папці images.
|
||||||
|
|
||||||
|
Для створення та редагування маршруту машини на мапі використовуйте клас LineMapLayer та функцію
|
||||||
|
`add_point()` з файлу lineMapLayer.py. Додавання лінії на мапу:
|
||||||
|
|
||||||
|
```python
|
||||||
|
map_layer = LineMapLayer()
|
||||||
|
mapview.add_layer(map_layer, mode="scatter")
|
||||||
|
```
|
||||||
|
|
||||||
|
Щоб створити затримку при відображенні руху машини можна використовувати
|
||||||
|
функцію `kivy.clock.Clock.schedule_once()` або `kivy.clock.Clock.schedule_interval()`.
|
||||||
|
|
||||||
|
Дані для відображення на мапі (координати та стан дороги) зчитуються з бази даних через вебсокет.
|
||||||
|
Для їх отримання використовуйте функцію `get_new_points()` з datasource.py.
|
||||||
|
|
||||||
|
**Шаблон основного файлу проєкту**
|
||||||
|
|
||||||
|
```python
|
||||||
|
from kivy.app import App
|
||||||
|
from kivy_garden.mapview import MapMarker, MapView
|
||||||
|
from kivy.clock import Clock
|
||||||
|
from lineMapLayer import LineMapLayer
|
||||||
|
|
||||||
|
|
||||||
|
class MapViewApp(App):
|
||||||
|
def __init__(self, **kwargs):
|
||||||
|
super().__init__()
|
||||||
|
# додати необхідні змінні
|
||||||
|
|
||||||
|
def on_start(self):
|
||||||
|
"""
|
||||||
|
Встановлює необхідні маркери, викликає функцію для оновлення мапи
|
||||||
|
"""
|
||||||
|
|
||||||
|
def update(self, *args):
|
||||||
|
"""
|
||||||
|
Викликається регулярно для оновлення мапи
|
||||||
|
"""
|
||||||
|
|
||||||
|
def update_car_marker(self, point):
|
||||||
|
"""
|
||||||
|
Оновлює відображення маркера машини на мапі
|
||||||
|
:param point: GPS координати
|
||||||
|
"""
|
||||||
|
|
||||||
|
def set_pothole_marker(self, point):
|
||||||
|
"""
|
||||||
|
Встановлює маркер для ями
|
||||||
|
:param point: GPS координати
|
||||||
|
"""
|
||||||
|
|
||||||
|
def set_bump_marker(self, point):
|
||||||
|
"""
|
||||||
|
Встановлює маркер для лежачого поліцейського
|
||||||
|
:param point: GPS координати
|
||||||
|
"""
|
||||||
|
|
||||||
|
def build(self):
|
||||||
|
"""
|
||||||
|
Ініціалізує мапу MapView(zoom, lat, lon)
|
||||||
|
:return: мапу
|
||||||
|
"""
|
||||||
|
self.mapview = MapView()
|
||||||
|
return self.mapview
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
MapViewApp().run()
|
||||||
|
```
|
||||||
@@ -0,0 +1,4 @@
|
|||||||
|
import os
|
||||||
|
|
||||||
|
STORE_HOST = os.environ.get("STORE_HOST") or "localhost"
|
||||||
|
STORE_PORT = os.environ.get("STORE_PORT") or 8000
|
||||||
File diff suppressed because one or more lines are too long
@@ -0,0 +1,81 @@
|
|||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
from datetime import datetime
|
||||||
|
import websockets
|
||||||
|
from kivy import Logger
|
||||||
|
from pydantic import BaseModel, field_validator
|
||||||
|
from config import STORE_HOST, STORE_PORT
|
||||||
|
|
||||||
|
|
||||||
|
# Pydantic models
|
||||||
|
class ProcessedAgentData(BaseModel):
|
||||||
|
road_state: str
|
||||||
|
user_id: int
|
||||||
|
x: float
|
||||||
|
y: float
|
||||||
|
z: float
|
||||||
|
latitude: float
|
||||||
|
longitude: float
|
||||||
|
timestamp: datetime
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
@field_validator("timestamp", mode="before")
|
||||||
|
def check_timestamp(cls, value):
|
||||||
|
if isinstance(value, datetime):
|
||||||
|
return value
|
||||||
|
try:
|
||||||
|
return datetime.fromisoformat(value)
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
raise ValueError(
|
||||||
|
"Invalid timestamp format. Expected ISO 8601 format (YYYY-MM-DDTHH:MM:SSZ)."
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class Datasource:
|
||||||
|
def __init__(self, user_id: int):
|
||||||
|
self.index = 0
|
||||||
|
self.user_id = user_id
|
||||||
|
self.connection_status = None
|
||||||
|
self._new_points = []
|
||||||
|
asyncio.ensure_future(self.connect_to_server())
|
||||||
|
|
||||||
|
def get_new_points(self):
|
||||||
|
Logger.debug(self._new_points)
|
||||||
|
points = self._new_points
|
||||||
|
self._new_points = []
|
||||||
|
return points
|
||||||
|
|
||||||
|
async def connect_to_server(self):
|
||||||
|
uri = f"ws://{STORE_HOST}:{STORE_PORT}/ws/{self.user_id}"
|
||||||
|
while True:
|
||||||
|
Logger.debug("CONNECT TO SERVER")
|
||||||
|
async with websockets.connect(uri) as websocket:
|
||||||
|
self.connection_status = "Connected"
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
data = await websocket.recv()
|
||||||
|
parsed_data = json.loads(data)
|
||||||
|
self.handle_received_data(parsed_data)
|
||||||
|
except websockets.ConnectionClosedOK:
|
||||||
|
self.connection_status = "Disconnected"
|
||||||
|
Logger.debug("SERVER DISCONNECT")
|
||||||
|
|
||||||
|
def handle_received_data(self, data):
|
||||||
|
# Update your UI or perform actions with received data here
|
||||||
|
Logger.debug(f"Received data: {data}")
|
||||||
|
processed_agent_data_list = sorted(
|
||||||
|
[
|
||||||
|
ProcessedAgentData(**processed_data_json)
|
||||||
|
for processed_data_json in json.loads(data)
|
||||||
|
],
|
||||||
|
key=lambda v: v.timestamp,
|
||||||
|
)
|
||||||
|
new_points = [
|
||||||
|
(
|
||||||
|
processed_agent_data.latitude,
|
||||||
|
processed_agent_data.longitude,
|
||||||
|
processed_agent_data.road_state,
|
||||||
|
)
|
||||||
|
for processed_agent_data in processed_agent_data_list
|
||||||
|
]
|
||||||
|
self._new_points.extend(new_points)
|
||||||
Binary file not shown.
|
After Width: | Height: | Size: 2.4 KiB |
Binary file not shown.
|
After Width: | Height: | Size: 4.0 KiB |
Binary file not shown.
|
After Width: | Height: | Size: 4.3 KiB |
@@ -0,0 +1,147 @@
|
|||||||
|
from kivy_garden.mapview import MapLayer, MapMarker
|
||||||
|
from kivy.graphics import Color, Line
|
||||||
|
from kivy.graphics.context_instructions import Translate, Scale, PushMatrix, PopMatrix
|
||||||
|
from kivy_garden.mapview.utils import clamp
|
||||||
|
from kivy_garden.mapview.constants import (MIN_LONGITUDE, MAX_LONGITUDE, MIN_LATITUDE, MAX_LATITUDE)
|
||||||
|
from math import radians, log, tan, cos, pi
|
||||||
|
|
||||||
|
|
||||||
|
class LineMapLayer(MapLayer):
|
||||||
|
def __init__(self, coordinates=None, color=[0, 0, 1, 1], width=2, **kwargs):
|
||||||
|
super().__init__(**kwargs)
|
||||||
|
# if coordinates is None:
|
||||||
|
# coordinates = [[0, 0], [0, 0]]
|
||||||
|
self._coordinates = coordinates
|
||||||
|
self.color = color
|
||||||
|
self._line_points = None
|
||||||
|
self._line_points_offset = (0, 0)
|
||||||
|
self.zoom = 0
|
||||||
|
self.lon = 0
|
||||||
|
self.lat = 0
|
||||||
|
self.ms = 0
|
||||||
|
self._width = width
|
||||||
|
|
||||||
|
@property
|
||||||
|
def coordinates(self):
|
||||||
|
return self._coordinates
|
||||||
|
|
||||||
|
@coordinates.setter
|
||||||
|
def coordinates(self, coordinates):
|
||||||
|
self._coordinates = coordinates
|
||||||
|
self.invalidate_line_points()
|
||||||
|
self.clear_and_redraw()
|
||||||
|
|
||||||
|
def add_point(self, point):
|
||||||
|
if self._coordinates is None:
|
||||||
|
#self._coordinates = [point]
|
||||||
|
self._coordinates = []
|
||||||
|
self._coordinates.append(point)
|
||||||
|
# self._coordinates = [self._coordinates[-1], point]
|
||||||
|
self.invalidate_line_points()
|
||||||
|
self.clear_and_redraw()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def line_points(self):
|
||||||
|
if self._line_points is None:
|
||||||
|
self.calc_line_points()
|
||||||
|
return self._line_points
|
||||||
|
|
||||||
|
@property
|
||||||
|
def line_points_offset(self):
|
||||||
|
if self._line_points is None:
|
||||||
|
self.calc_line_points()
|
||||||
|
return self._line_points_offset
|
||||||
|
|
||||||
|
def calc_line_points(self):
|
||||||
|
# Offset all points by the coordinates of the first point,
|
||||||
|
# to keep coordinates closer to zero.
|
||||||
|
# (and therefore avoid some float precision issues when drawing lines)
|
||||||
|
self._line_points_offset = (self.get_x(self.coordinates[0][1]),
|
||||||
|
self.get_y(self.coordinates[0][0]))
|
||||||
|
# Since lat is not a linear transform we must compute manually
|
||||||
|
self._line_points = [(self.get_x(lon) - self._line_points_offset[0],
|
||||||
|
self.get_y(lat) - self._line_points_offset[1])
|
||||||
|
for lat, lon in self.coordinates]
|
||||||
|
|
||||||
|
def invalidate_line_points(self):
|
||||||
|
self._line_points = None
|
||||||
|
self._line_points_offset = (0, 0)
|
||||||
|
|
||||||
|
def get_x(self, lon):
|
||||||
|
"""Get the x position on the map using this map source's projection
|
||||||
|
(0, 0) is located at the top left.
|
||||||
|
"""
|
||||||
|
return clamp(lon, MIN_LONGITUDE, MAX_LONGITUDE) * self.ms / 360.0
|
||||||
|
|
||||||
|
def get_y(self, lat):
|
||||||
|
"""Get the y position on the map using this map source's projection
|
||||||
|
(0, 0) is located at the top left.
|
||||||
|
"""
|
||||||
|
lat = radians(clamp(-lat, MIN_LATITUDE, MAX_LATITUDE))
|
||||||
|
return (1.0 - log(tan(lat) + 1.0 / cos(lat)) / pi) * self.ms / 2.0
|
||||||
|
|
||||||
|
# Function called when the MapView is moved
|
||||||
|
def reposition(self):
|
||||||
|
map_view = self.parent
|
||||||
|
|
||||||
|
# Must redraw when the zoom changes
|
||||||
|
# as the scatter transform resets for the new tiles
|
||||||
|
if self.zoom != map_view.zoom or \
|
||||||
|
self.lon != round(map_view.lon, 7) or \
|
||||||
|
self.lat != round(map_view.lat, 7):
|
||||||
|
map_source = map_view.map_source
|
||||||
|
self.ms = pow(2.0, map_view.zoom) * map_source.dp_tile_size
|
||||||
|
self.invalidate_line_points()
|
||||||
|
self.clear_and_redraw()
|
||||||
|
|
||||||
|
def clear_and_redraw(self, *args):
|
||||||
|
with self.canvas:
|
||||||
|
# Clear old line
|
||||||
|
self.canvas.clear()
|
||||||
|
|
||||||
|
self._draw_line()
|
||||||
|
|
||||||
|
def _draw_line(self, *args):
|
||||||
|
if self._coordinates is None:
|
||||||
|
return
|
||||||
|
map_view = self.parent
|
||||||
|
self.zoom = map_view.zoom
|
||||||
|
self.lon = map_view.lon
|
||||||
|
self.lat = map_view.lat
|
||||||
|
|
||||||
|
# When zooming we must undo the current scatter transform
|
||||||
|
# or the animation distorts it
|
||||||
|
scatter = map_view._scatter
|
||||||
|
sx, sy, ss = scatter.x, scatter.y, scatter.scale
|
||||||
|
|
||||||
|
# Account for map source tile size and map view zoom
|
||||||
|
vx, vy, vs = map_view.viewport_pos[0], map_view.viewport_pos[1], map_view.scale
|
||||||
|
|
||||||
|
with self.canvas:
|
||||||
|
self.opacity = 0.5
|
||||||
|
# Save the current coordinate space context
|
||||||
|
PushMatrix()
|
||||||
|
|
||||||
|
# Offset by the MapView's position in the window (always 0,0 ?)
|
||||||
|
Translate(*map_view.pos)
|
||||||
|
|
||||||
|
# Undo the scatter animation transform
|
||||||
|
Scale(1 / ss, 1 / ss, 1)
|
||||||
|
Translate(-sx, -sy)
|
||||||
|
|
||||||
|
# Apply the get window xy from transforms
|
||||||
|
Scale(vs, vs, 1)
|
||||||
|
Translate(-vx, -vy)
|
||||||
|
|
||||||
|
# Apply what we can factor out of the mapsource long, lat to x, y conversion
|
||||||
|
Translate(self.ms / 2, 0)
|
||||||
|
|
||||||
|
# Translate by the offset of the line points
|
||||||
|
# (this keeps the points closer to the origin)
|
||||||
|
Translate(*self.line_points_offset)
|
||||||
|
|
||||||
|
Color(*self.color)
|
||||||
|
Line(points=self.line_points, width=self._width)
|
||||||
|
|
||||||
|
# Retrieve the last saved coordinate space context
|
||||||
|
PopMatrix()
|
||||||
@@ -0,0 +1,55 @@
|
|||||||
|
import asyncio
|
||||||
|
from kivy.app import App
|
||||||
|
from kivy_garden.mapview import MapMarker, MapView
|
||||||
|
from kivy.clock import Clock
|
||||||
|
from lineMapLayer import LineMapLayer
|
||||||
|
from datasource import Datasource
|
||||||
|
|
||||||
|
|
||||||
|
class MapViewApp(App):
|
||||||
|
def __init__(self, **kwargs):
|
||||||
|
super().__init__()
|
||||||
|
# додати необхідні змінні
|
||||||
|
|
||||||
|
def on_start(self):
|
||||||
|
"""
|
||||||
|
Встановлює необхідні маркери, викликає функцію для оновлення мапи
|
||||||
|
"""
|
||||||
|
|
||||||
|
def update(self, *args):
|
||||||
|
"""
|
||||||
|
Викликається регулярно для оновлення мапи
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
def update_car_marker(self, point):
|
||||||
|
"""
|
||||||
|
Оновлює відображення маркера машини на мапі
|
||||||
|
:param point: GPS координати
|
||||||
|
"""
|
||||||
|
|
||||||
|
def set_pothole_marker(self, point):
|
||||||
|
"""
|
||||||
|
Встановлює маркер для ями
|
||||||
|
:param point: GPS координати
|
||||||
|
"""
|
||||||
|
|
||||||
|
def set_bump_marker(self, point):
|
||||||
|
"""
|
||||||
|
Встановлює маркер для лежачого поліцейського
|
||||||
|
:param point: GPS координати
|
||||||
|
"""
|
||||||
|
|
||||||
|
def build(self):
|
||||||
|
"""
|
||||||
|
Ініціалізує мапу MapView(zoom, lat, lon)
|
||||||
|
:return: мапу
|
||||||
|
"""
|
||||||
|
self.mapview = MapView()
|
||||||
|
return self.mapview
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
loop.run_until_complete(MapViewApp().async_run(async_lib="asyncio"))
|
||||||
|
loop.close()
|
||||||
@@ -0,0 +1,15 @@
|
|||||||
|
annotated-types==0.6.0
|
||||||
|
certifi==2024.2.2
|
||||||
|
charset-normalizer==3.3.2
|
||||||
|
docutils==0.20.1
|
||||||
|
idna==3.6
|
||||||
|
Kivy==2.3.0
|
||||||
|
Kivy-Garden==0.1.5
|
||||||
|
kivy-garden.mapview==1.0.6
|
||||||
|
pydantic==2.6.2
|
||||||
|
pydantic_core==2.16.3
|
||||||
|
Pygments==2.17.2
|
||||||
|
requests==2.31.0
|
||||||
|
typing_extensions==4.10.0
|
||||||
|
urllib3==2.2.1
|
||||||
|
websockets==12.0
|
||||||
@@ -0,0 +1,34 @@
|
|||||||
|
version: "3.3"
|
||||||
|
#name: "road_vision"
|
||||||
|
services:
|
||||||
|
mqtt:
|
||||||
|
image: eclipse-mosquitto
|
||||||
|
container_name: mqtt
|
||||||
|
volumes:
|
||||||
|
- ./mosquitto:/mosquitto
|
||||||
|
- ./mosquitto/data:/mosquitto/data
|
||||||
|
- ./mosquitto/log:/mosquitto/log
|
||||||
|
ports:
|
||||||
|
- 1883:1883
|
||||||
|
- 9001:9001
|
||||||
|
networks:
|
||||||
|
mqtt_network:
|
||||||
|
|
||||||
|
|
||||||
|
fake_agent:
|
||||||
|
container_name: agent
|
||||||
|
build: ../
|
||||||
|
depends_on:
|
||||||
|
- mqtt
|
||||||
|
environment:
|
||||||
|
MQTT_BROKER_HOST: "mqtt"
|
||||||
|
MQTT_BROKER_PORT: 1883
|
||||||
|
MQTT_TOPIC: "agent_data_topic"
|
||||||
|
DELAY: 0.1
|
||||||
|
MAX_SENDS: 30
|
||||||
|
networks:
|
||||||
|
mqtt_network:
|
||||||
|
|
||||||
|
|
||||||
|
networks:
|
||||||
|
mqtt_network:
|
||||||
@@ -1,5 +1,5 @@
|
|||||||
version: "3.9"
|
version: "3.3"
|
||||||
name: "road_vision"
|
#name: "road_vision"
|
||||||
services:
|
services:
|
||||||
mqtt:
|
mqtt:
|
||||||
image: eclipse-mosquitto
|
image: eclipse-mosquitto
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ def try_parse(type, value: str):
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
USER_ID = 1
|
||||||
# MQTT config
|
# MQTT config
|
||||||
MQTT_BROKER_HOST = os.environ.get("MQTT_BROKER_HOST") or "mqtt"
|
MQTT_BROKER_HOST = os.environ.get("MQTT_BROKER_HOST") or "mqtt"
|
||||||
MQTT_BROKER_PORT = try_parse(int, os.environ.get("MQTT_BROKER_PORT")) or 1883
|
MQTT_BROKER_PORT = try_parse(int, os.environ.get("MQTT_BROKER_PORT")) or 1883
|
||||||
@@ -15,3 +16,6 @@ MQTT_TOPIC = os.environ.get("MQTT_TOPIC") or "agent"
|
|||||||
|
|
||||||
# Delay for sending data to mqtt in seconds
|
# Delay for sending data to mqtt in seconds
|
||||||
DELAY = try_parse(float, os.environ.get("DELAY")) or 1
|
DELAY = try_parse(float, os.environ.get("DELAY")) or 1
|
||||||
|
|
||||||
|
# Testing switches for CI/CD
|
||||||
|
MAX_SENDS = try_parse(int, os.environ.get("MAX_SENDS"))
|
||||||
|
|||||||
@@ -9,4 +9,5 @@ from domain.gps import Gps
|
|||||||
class AggregatedData:
|
class AggregatedData:
|
||||||
accelerometer: Accelerometer
|
accelerometer: Accelerometer
|
||||||
gps: Gps
|
gps: Gps
|
||||||
time: datetime
|
timestamp: datetime
|
||||||
|
user_id: int
|
||||||
|
|||||||
+222
-11
@@ -1,21 +1,232 @@
|
|||||||
from csv import reader
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import csv
|
||||||
|
import time
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Optional, List
|
||||||
|
|
||||||
|
from domain.accelerometer import Accelerometer
|
||||||
|
from domain.gps import Gps
|
||||||
from domain.aggregated_data import AggregatedData
|
from domain.aggregated_data import AggregatedData
|
||||||
|
import config
|
||||||
|
|
||||||
|
|
||||||
class FileDatasource:
|
class FileDatasource:
|
||||||
def __init__(
|
|
||||||
self,
|
|
||||||
accelerometer_filename: str,
|
|
||||||
gps_filename: str,
|
|
||||||
) -> None:
|
|
||||||
pass
|
|
||||||
|
|
||||||
def read(self) -> AggregatedData:
|
def __init__(self, accelerometer_filename: str, gps_filename: str) -> None:
|
||||||
"""Метод повертає дані отримані з датчиків"""
|
self.accelerometer_filename = accelerometer_filename
|
||||||
|
self.gps_filename = gps_filename
|
||||||
|
|
||||||
|
self._acc_f = None
|
||||||
|
self._gps_f = None
|
||||||
|
self._acc_reader: Optional[csv.reader] = None
|
||||||
|
self._gps_reader: Optional[csv.reader] = None
|
||||||
|
|
||||||
|
self._started = False
|
||||||
|
|
||||||
|
# one-row buffers (supports CSVs with or without header)
|
||||||
|
self._acc_buf: Optional[List[str]] = None
|
||||||
|
self._gps_buf: Optional[List[str]] = None
|
||||||
|
|
||||||
|
self._acc_has_header: Optional[bool] = None
|
||||||
|
self._gps_has_header: Optional[bool] = None
|
||||||
|
|
||||||
def startReading(self, *args, **kwargs):
|
def startReading(self, *args, **kwargs):
|
||||||
"""Метод повинен викликатись перед початком читання даних"""
|
"""Must be called before read()"""
|
||||||
|
if self._started:
|
||||||
|
return
|
||||||
|
|
||||||
|
if not Path(self.accelerometer_filename).exists():
|
||||||
|
raise FileNotFoundError(f"Accelerometer file not found: {self.accelerometer_filename}")
|
||||||
|
if not Path(self.gps_filename).exists():
|
||||||
|
raise FileNotFoundError(f"GPS file not found: {self.gps_filename}")
|
||||||
|
|
||||||
|
self._open_files()
|
||||||
|
self._started = True
|
||||||
|
|
||||||
def stopReading(self, *args, **kwargs):
|
def stopReading(self, *args, **kwargs):
|
||||||
"""Метод повинен викликатись для закінчення читання даних"""
|
"""Must be called when finishing reading"""
|
||||||
|
self._close_files()
|
||||||
|
self._started = False
|
||||||
|
|
||||||
|
def read(self) -> AggregatedData:
|
||||||
|
"""Return one combined sample (acc + gps)."""
|
||||||
|
if not self._started:
|
||||||
|
raise RuntimeError("Datasource is not started. Call startReading() before read().")
|
||||||
|
|
||||||
|
acc_row = self._next_acc_row()
|
||||||
|
gps_row = self._next_gps_row()
|
||||||
|
|
||||||
|
acc = self._parse_acc(acc_row)
|
||||||
|
gps = self._parse_gps(gps_row)
|
||||||
|
|
||||||
|
# IMPORTANT: timing belongs to datasource (not MQTT / main.py)
|
||||||
|
if config.DELAY and config.DELAY > 0:
|
||||||
|
time.sleep(float(config.DELAY))
|
||||||
|
|
||||||
|
return AggregatedData(
|
||||||
|
accelerometer=acc,
|
||||||
|
gps=gps,
|
||||||
|
timestamp=datetime.utcnow(),
|
||||||
|
user_id=config.USER_ID,
|
||||||
|
)
|
||||||
|
|
||||||
|
# ---------------- internal ----------------
|
||||||
|
|
||||||
|
def _open_files(self) -> None:
|
||||||
|
self._close_files()
|
||||||
|
|
||||||
|
self._acc_f = open(self.accelerometer_filename, "r", newline="", encoding="utf-8")
|
||||||
|
self._gps_f = open(self.gps_filename, "r", newline="", encoding="utf-8")
|
||||||
|
|
||||||
|
self._acc_reader = csv.reader(self._acc_f)
|
||||||
|
self._gps_reader = csv.reader(self._gps_f)
|
||||||
|
|
||||||
|
self._acc_buf = None
|
||||||
|
self._gps_buf = None
|
||||||
|
|
||||||
|
self._acc_has_header, self._acc_buf = self._detect_header_and_buffer(
|
||||||
|
self._acc_reader, expected_cols=3, header_tokens=("x", "y", "z")
|
||||||
|
)
|
||||||
|
self._gps_has_header, self._gps_buf = self._detect_header_and_buffer(
|
||||||
|
self._gps_reader, expected_cols=2, header_tokens=("longitude", "latitude")
|
||||||
|
)
|
||||||
|
|
||||||
|
def _close_files(self) -> None:
|
||||||
|
for f in (self._acc_f, self._gps_f):
|
||||||
|
try:
|
||||||
|
if f is not None:
|
||||||
|
f.close()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
self._acc_f = None
|
||||||
|
self._gps_f = None
|
||||||
|
self._acc_reader = None
|
||||||
|
self._gps_reader = None
|
||||||
|
self._acc_buf = None
|
||||||
|
self._gps_buf = None
|
||||||
|
self._acc_has_header = None
|
||||||
|
self._gps_has_header = None
|
||||||
|
|
||||||
|
def _rewind_acc(self) -> None:
|
||||||
|
if self._acc_f is None:
|
||||||
|
raise RuntimeError("Accelerometer file is not open.")
|
||||||
|
self._acc_f.seek(0)
|
||||||
|
self._acc_reader = csv.reader(self._acc_f)
|
||||||
|
self._acc_has_header, self._acc_buf = self._detect_header_and_buffer(
|
||||||
|
self._acc_reader, expected_cols=3, header_tokens=("x", "y", "z")
|
||||||
|
)
|
||||||
|
|
||||||
|
def _rewind_gps(self) -> None:
|
||||||
|
if self._gps_f is None:
|
||||||
|
raise RuntimeError("GPS file is not open.")
|
||||||
|
self._gps_f.seek(0)
|
||||||
|
self._gps_reader = csv.reader(self._gps_f)
|
||||||
|
self._gps_has_header, self._gps_buf = self._detect_header_and_buffer(
|
||||||
|
self._gps_reader, expected_cols=2, header_tokens=("longitude", "latitude")
|
||||||
|
)
|
||||||
|
|
||||||
|
def _next_acc_row(self) -> List[str]:
|
||||||
|
if self._acc_reader is None:
|
||||||
|
raise RuntimeError("Accelerometer reader is not initialized.")
|
||||||
|
|
||||||
|
while True:
|
||||||
|
if self._acc_buf is not None:
|
||||||
|
row = self._acc_buf
|
||||||
|
self._acc_buf = None
|
||||||
|
else:
|
||||||
|
row = next(self._acc_reader, None)
|
||||||
|
|
||||||
|
if row is None:
|
||||||
|
# EOF -> rewind & continue
|
||||||
|
self._rewind_acc()
|
||||||
|
continue
|
||||||
|
|
||||||
|
row = [c.strip() for c in row]
|
||||||
|
if not row or not any(row):
|
||||||
|
continue
|
||||||
|
|
||||||
|
return row
|
||||||
|
|
||||||
|
def _next_gps_row(self) -> List[str]:
|
||||||
|
if self._gps_reader is None:
|
||||||
|
raise RuntimeError("GPS reader is not initialized.")
|
||||||
|
|
||||||
|
while True:
|
||||||
|
if self._gps_buf is not None:
|
||||||
|
row = self._gps_buf
|
||||||
|
self._gps_buf = None
|
||||||
|
else:
|
||||||
|
row = next(self._gps_reader, None)
|
||||||
|
|
||||||
|
if row is None:
|
||||||
|
# EOF -> rewind & continue
|
||||||
|
self._rewind_gps()
|
||||||
|
continue
|
||||||
|
|
||||||
|
row = [c.strip() for c in row]
|
||||||
|
if not row or not any(row):
|
||||||
|
continue
|
||||||
|
|
||||||
|
return row
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _detect_header_and_buffer(
|
||||||
|
rdr: csv.reader, expected_cols: int, header_tokens: tuple[str, ...]
|
||||||
|
) -> tuple[bool, Optional[List[str]]]:
|
||||||
|
|
||||||
|
first = None
|
||||||
|
while True:
|
||||||
|
first = next(rdr, None)
|
||||||
|
if first is None:
|
||||||
|
return False, None
|
||||||
|
first = [c.strip() for c in first]
|
||||||
|
if first and any(first):
|
||||||
|
break
|
||||||
|
|
||||||
|
norm = [c.lower() for c in first]
|
||||||
|
|
||||||
|
# Header if it contains the expected tokens
|
||||||
|
if all(tok in norm for tok in header_tokens):
|
||||||
|
return True, None
|
||||||
|
|
||||||
|
# If first row is numeric-like and has enough columns => it's data (buffer it back)
|
||||||
|
if len(norm) >= expected_cols and all(FileDatasource._is_number(x) for x in norm[:expected_cols]):
|
||||||
|
return False, first
|
||||||
|
|
||||||
|
# Otherwise treat it as header-ish (skip it)
|
||||||
|
return True, None
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _parse_acc(row: List[str]) -> Accelerometer:
|
||||||
|
if len(row) < 3:
|
||||||
|
raise ValueError(f"Accelerometer row must have 3 values (x,y,z). Got: {row}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
x = int(row[0])
|
||||||
|
y = int(row[1])
|
||||||
|
z = int(row[2])
|
||||||
|
except ValueError as e:
|
||||||
|
raise ValueError(
|
||||||
|
f"Invalid accelerometer values (expected integers): {row}"
|
||||||
|
) from e
|
||||||
|
|
||||||
|
return Accelerometer(x=x, y=y, z=z)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _parse_gps(row: List[str]) -> Gps:
|
||||||
|
if len(row) < 2:
|
||||||
|
raise ValueError(f"GPS row must have 2 values (longitude,latitude). Got: {row}")
|
||||||
|
lon = float(row[0])
|
||||||
|
lat = float(row[1])
|
||||||
|
return Gps(longitude=lon, latitude=lat)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _is_number(s: str) -> bool:
|
||||||
|
try:
|
||||||
|
float(s)
|
||||||
|
return True
|
||||||
|
except Exception:
|
||||||
|
return False
|
||||||
|
|||||||
+12
-11
@@ -1,6 +1,4 @@
|
|||||||
from paho.mqtt import client as mqtt_client
|
from paho.mqtt import client as mqtt_client
|
||||||
import json
|
|
||||||
import time
|
|
||||||
from schema.aggregated_data_schema import AggregatedDataSchema
|
from schema.aggregated_data_schema import AggregatedDataSchema
|
||||||
from file_datasource import FileDatasource
|
from file_datasource import FileDatasource
|
||||||
import config
|
import config
|
||||||
@@ -24,29 +22,32 @@ def connect_mqtt(broker, port):
|
|||||||
return client
|
return client
|
||||||
|
|
||||||
|
|
||||||
def publish(client, topic, datasource, delay):
|
def publish(client, topic, datasource, max_sends = None):
|
||||||
datasource.startReading()
|
datasource.startReading()
|
||||||
|
|
||||||
|
i = 0
|
||||||
while True:
|
while True:
|
||||||
time.sleep(delay)
|
i += 1
|
||||||
|
|
||||||
data = datasource.read()
|
data = datasource.read()
|
||||||
msg = AggregatedDataSchema().dumps(data)
|
msg = AggregatedDataSchema().dumps(data)
|
||||||
result = client.publish(topic, msg)
|
result = client.publish(topic, msg)
|
||||||
# result: [0, 1]
|
|
||||||
status = result[0]
|
status = result[0]
|
||||||
if status == 0:
|
if status != 0:
|
||||||
pass
|
|
||||||
# print(f"Send `{msg}` to topic `{topic}`")
|
|
||||||
else:
|
|
||||||
print(f"Failed to send message to topic {topic}")
|
print(f"Failed to send message to topic {topic}")
|
||||||
|
|
||||||
|
if max_sends and i >= max_sends:
|
||||||
|
# display test success
|
||||||
|
exit(0)
|
||||||
|
|
||||||
|
|
||||||
def run():
|
def run():
|
||||||
# Prepare mqtt client
|
# Prepare mqtt client
|
||||||
client = connect_mqtt(config.MQTT_BROKER_HOST, config.MQTT_BROKER_PORT)
|
client = connect_mqtt(config.MQTT_BROKER_HOST, config.MQTT_BROKER_PORT)
|
||||||
# Prepare datasource
|
# Prepare datasource
|
||||||
datasource = FileDatasource("data/data.csv", "data/gps_data.csv")
|
datasource = FileDatasource("data/accelerometer.csv", "data/gps.csv")
|
||||||
# Infinity publish data
|
# Infinity publish data
|
||||||
publish(client, config.MQTT_TOPIC, datasource, config.DELAY)
|
publish(client, config.MQTT_TOPIC, datasource, getattr(config, "MAX_SENDS", None))
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|||||||
@@ -6,4 +6,5 @@ from schema.gps_schema import GpsSchema
|
|||||||
class AggregatedDataSchema(Schema):
|
class AggregatedDataSchema(Schema):
|
||||||
accelerometer = fields.Nested(AccelerometerSchema)
|
accelerometer = fields.Nested(AccelerometerSchema)
|
||||||
gps = fields.Nested(GpsSchema)
|
gps = fields.Nested(GpsSchema)
|
||||||
time = fields.DateTime("iso")
|
timestamp = fields.DateTime("iso")
|
||||||
|
user_id = fields.Int()
|
||||||
|
|||||||
@@ -0,0 +1,2 @@
|
|||||||
|
venv
|
||||||
|
app.log
|
||||||
@@ -0,0 +1,11 @@
|
|||||||
|
# Use the official Python image as the base image
|
||||||
|
FROM python:3.9-slim
|
||||||
|
# Set the working directory inside the container
|
||||||
|
WORKDIR /app
|
||||||
|
# Copy the requirements.txt file and install dependencies
|
||||||
|
COPY requirements.txt .
|
||||||
|
RUN pip install --no-cache-dir -r requirements.txt
|
||||||
|
# Copy the entire application into the container
|
||||||
|
COPY . .
|
||||||
|
# Run the main.py script inside the container when it starts
|
||||||
|
CMD ["python", "main.py"]
|
||||||
@@ -0,0 +1,76 @@
|
|||||||
|
import logging
|
||||||
|
import paho.mqtt.client as mqtt
|
||||||
|
from app.interfaces.agent_gateway import AgentGateway
|
||||||
|
from app.entities.agent_data import AgentData, GpsData
|
||||||
|
from app.usecases.data_processing import process_agent_data
|
||||||
|
from app.interfaces.hub_gateway import HubGateway
|
||||||
|
|
||||||
|
|
||||||
|
class AgentMQTTAdapter(AgentGateway):
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
broker_host,
|
||||||
|
broker_port,
|
||||||
|
topic,
|
||||||
|
hub_gateway: HubGateway,
|
||||||
|
batch_size=10,
|
||||||
|
):
|
||||||
|
self.batch_size = batch_size
|
||||||
|
# MQTT
|
||||||
|
self.broker_host = broker_host
|
||||||
|
self.broker_port = broker_port
|
||||||
|
self.topic = topic
|
||||||
|
self.client = mqtt.Client()
|
||||||
|
# Hub
|
||||||
|
self.hub_gateway = hub_gateway
|
||||||
|
|
||||||
|
def on_connect(self, client, userdata, flags, rc):
|
||||||
|
if rc == 0:
|
||||||
|
logging.info("Connected to MQTT broker")
|
||||||
|
self.client.subscribe(self.topic)
|
||||||
|
else:
|
||||||
|
logging.info(f"Failed to connect to MQTT broker with code: {rc}")
|
||||||
|
|
||||||
|
def on_message(self, client, userdata, msg):
|
||||||
|
"""Processing agent data and sent it to hub gateway"""
|
||||||
|
try:
|
||||||
|
payload: str = msg.payload.decode("utf-8")
|
||||||
|
# Create AgentData instance with the received data
|
||||||
|
agent_data = AgentData.model_validate_json(payload, strict=True)
|
||||||
|
# Process the received data (you can call a use case here if needed)
|
||||||
|
processed_data = process_agent_data(agent_data)
|
||||||
|
# Store the agent_data in the database (you can send it to the data processing module)
|
||||||
|
if not self.hub_gateway.save_data(processed_data):
|
||||||
|
logging.error("Hub is not available")
|
||||||
|
except Exception as e:
|
||||||
|
logging.info(f"Error processing MQTT message: {e}")
|
||||||
|
|
||||||
|
def connect(self):
|
||||||
|
self.client.on_connect = self.on_connect
|
||||||
|
self.client.on_message = self.on_message
|
||||||
|
self.client.connect(self.broker_host, self.broker_port, 60)
|
||||||
|
|
||||||
|
def start(self):
|
||||||
|
self.client.loop_start()
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
self.client.loop_stop()
|
||||||
|
|
||||||
|
|
||||||
|
# Usage example:
|
||||||
|
if __name__ == "__main__":
|
||||||
|
broker_host = "localhost"
|
||||||
|
broker_port = 1883
|
||||||
|
topic = "agent_data_topic"
|
||||||
|
# Assuming you have implemented the StoreGateway and passed it to the adapter
|
||||||
|
store_gateway = HubGateway()
|
||||||
|
adapter = AgentMQTTAdapter(broker_host, broker_port, topic, store_gateway)
|
||||||
|
adapter.connect()
|
||||||
|
adapter.start()
|
||||||
|
try:
|
||||||
|
# Keep the adapter running in the background
|
||||||
|
while True:
|
||||||
|
pass
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
adapter.stop()
|
||||||
|
logging.info("Adapter stopped.")
|
||||||
@@ -0,0 +1,29 @@
|
|||||||
|
import logging
|
||||||
|
|
||||||
|
import requests as requests
|
||||||
|
|
||||||
|
from app.entities.processed_agent_data import ProcessedAgentData
|
||||||
|
from app.interfaces.hub_gateway import HubGateway
|
||||||
|
|
||||||
|
|
||||||
|
class HubHttpAdapter(HubGateway):
|
||||||
|
def __init__(self, api_base_url):
|
||||||
|
self.api_base_url = api_base_url
|
||||||
|
|
||||||
|
def save_data(self, processed_data: ProcessedAgentData):
|
||||||
|
"""
|
||||||
|
Save the processed road data to the Hub.
|
||||||
|
Parameters:
|
||||||
|
processed_data (ProcessedAgentData): Processed road data to be saved.
|
||||||
|
Returns:
|
||||||
|
bool: True if the data is successfully saved, False otherwise.
|
||||||
|
"""
|
||||||
|
url = f"{self.api_base_url}/processed_agent_data/"
|
||||||
|
|
||||||
|
response = requests.post(url, data=processed_data.model_dump_json())
|
||||||
|
if response.status_code != 200:
|
||||||
|
logging.info(
|
||||||
|
f"Invalid Hub response\nData: {processed_data.model_dump_json()}\nResponse: {response}"
|
||||||
|
)
|
||||||
|
return False
|
||||||
|
return True
|
||||||
@@ -0,0 +1,50 @@
|
|||||||
|
import logging
|
||||||
|
|
||||||
|
import requests as requests
|
||||||
|
from paho.mqtt import client as mqtt_client
|
||||||
|
|
||||||
|
from app.entities.processed_agent_data import ProcessedAgentData
|
||||||
|
from app.interfaces.hub_gateway import HubGateway
|
||||||
|
|
||||||
|
|
||||||
|
class HubMqttAdapter(HubGateway):
|
||||||
|
def __init__(self, broker, port, topic):
|
||||||
|
self.broker = broker
|
||||||
|
self.port = port
|
||||||
|
self.topic = topic
|
||||||
|
self.mqtt_client = self._connect_mqtt(broker, port)
|
||||||
|
|
||||||
|
def save_data(self, processed_data: ProcessedAgentData):
|
||||||
|
"""
|
||||||
|
Save the processed road data to the Hub.
|
||||||
|
Parameters:
|
||||||
|
processed_data (ProcessedAgentData): Processed road data to be saved.
|
||||||
|
Returns:
|
||||||
|
bool: True if the data is successfully saved, False otherwise.
|
||||||
|
"""
|
||||||
|
msg = processed_data.model_dump_json()
|
||||||
|
result = self.mqtt_client.publish(self.topic, msg)
|
||||||
|
status = result[0]
|
||||||
|
if status == 0:
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
print(f"Failed to send message to topic {self.topic}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _connect_mqtt(broker, port):
|
||||||
|
"""Create MQTT client"""
|
||||||
|
print(f"CONNECT TO {broker}:{port}")
|
||||||
|
|
||||||
|
def on_connect(client, userdata, flags, rc):
|
||||||
|
if rc == 0:
|
||||||
|
print(f"Connected to MQTT Broker ({broker}:{port})!")
|
||||||
|
else:
|
||||||
|
print("Failed to connect {broker}:{port}, return code %d\n", rc)
|
||||||
|
exit(rc) # Stop execution
|
||||||
|
|
||||||
|
client = mqtt_client.Client()
|
||||||
|
client.on_connect = on_connect
|
||||||
|
client.connect(broker, port)
|
||||||
|
client.loop_start()
|
||||||
|
return client
|
||||||
@@ -0,0 +1,32 @@
|
|||||||
|
from datetime import datetime
|
||||||
|
from pydantic import BaseModel, field_validator
|
||||||
|
|
||||||
|
|
||||||
|
class AccelerometerData(BaseModel):
|
||||||
|
x: float
|
||||||
|
y: float
|
||||||
|
z: float
|
||||||
|
|
||||||
|
|
||||||
|
class GpsData(BaseModel):
|
||||||
|
latitude: float
|
||||||
|
longitude: float
|
||||||
|
|
||||||
|
|
||||||
|
class AgentData(BaseModel):
|
||||||
|
accelerometer: AccelerometerData
|
||||||
|
gps: GpsData
|
||||||
|
timestamp: datetime
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
@field_validator("timestamp", mode="before")
|
||||||
|
def parse_timestamp(cls, value):
|
||||||
|
# Convert the timestamp to a datetime object
|
||||||
|
if isinstance(value, datetime):
|
||||||
|
return value
|
||||||
|
try:
|
||||||
|
return datetime.fromisoformat(value)
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
raise ValueError(
|
||||||
|
"Invalid timestamp format. Expected ISO 8601 format (YYYY-MM-DDTHH:MM:SSZ)."
|
||||||
|
)
|
||||||
@@ -0,0 +1,7 @@
|
|||||||
|
from pydantic import BaseModel
|
||||||
|
from app.entities.agent_data import AgentData
|
||||||
|
|
||||||
|
|
||||||
|
class ProcessedAgentData(BaseModel):
|
||||||
|
road_state: str
|
||||||
|
agent_data: AgentData
|
||||||
@@ -0,0 +1,40 @@
|
|||||||
|
from abc import ABC, abstractmethod
|
||||||
|
|
||||||
|
|
||||||
|
class AgentGateway(ABC):
|
||||||
|
"""
|
||||||
|
Abstract class representing the Agent Gateway interface.
|
||||||
|
All agent gateway adapters must implement these methods.
|
||||||
|
"""
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def on_message(self, client, userdata, msg):
|
||||||
|
"""
|
||||||
|
Method to handle incoming messages from the agent.
|
||||||
|
Parameters:
|
||||||
|
client: MQTT client instance.
|
||||||
|
userdata: Any additional user data passed to the MQTT client.
|
||||||
|
msg: The MQTT message received from the agent.
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def connect(self):
|
||||||
|
"""
|
||||||
|
Method to establish a connection to the agent.
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def start(self):
|
||||||
|
"""
|
||||||
|
Method to start listening for messages from the agent.
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def stop(self):
|
||||||
|
"""
|
||||||
|
Method to stop the agent gateway and clean up resources.
|
||||||
|
"""
|
||||||
|
pass
|
||||||
@@ -0,0 +1,20 @@
|
|||||||
|
from abc import ABC, abstractmethod
|
||||||
|
from app.entities.processed_agent_data import ProcessedAgentData
|
||||||
|
|
||||||
|
|
||||||
|
class HubGateway(ABC):
|
||||||
|
"""
|
||||||
|
Abstract class representing the Store Gateway interface.
|
||||||
|
All store gateway adapters must implement these methods.
|
||||||
|
"""
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def save_data(self, processed_data: ProcessedAgentData) -> bool:
|
||||||
|
"""
|
||||||
|
Method to save the processed agent data in the database.
|
||||||
|
Parameters:
|
||||||
|
processed_data (ProcessedAgentData): The processed agent data to be saved.
|
||||||
|
Returns:
|
||||||
|
bool: True if the data is successfully saved, False otherwise.
|
||||||
|
"""
|
||||||
|
pass
|
||||||
@@ -0,0 +1,15 @@
|
|||||||
|
from app.entities.agent_data import AgentData
|
||||||
|
from app.entities.processed_agent_data import ProcessedAgentData
|
||||||
|
|
||||||
|
|
||||||
|
def process_agent_data(
|
||||||
|
agent_data: AgentData,
|
||||||
|
) -> ProcessedAgentData:
|
||||||
|
"""
|
||||||
|
Process agent data and classify the state of the road surface.
|
||||||
|
Parameters:
|
||||||
|
agent_data (AgentData): Agent data that containing accelerometer, GPS, and timestamp.
|
||||||
|
Returns:
|
||||||
|
processed_data_batch (ProcessedAgentData): Processed data containing the classified state of the road surface and agent data.
|
||||||
|
"""
|
||||||
|
# Implement it
|
||||||
@@ -0,0 +1,24 @@
|
|||||||
|
import os
|
||||||
|
|
||||||
|
|
||||||
|
def try_parse_int(value: str):
|
||||||
|
try:
|
||||||
|
return int(value)
|
||||||
|
except Exception:
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
# Configuration for agent MQTT
|
||||||
|
MQTT_BROKER_HOST = os.environ.get("MQTT_BROKER_HOST") or "localhost"
|
||||||
|
MQTT_BROKER_PORT = try_parse_int(os.environ.get("MQTT_BROKER_PORT")) or 1883
|
||||||
|
MQTT_TOPIC = os.environ.get("MQTT_TOPIC") or "agent_data_topic"
|
||||||
|
|
||||||
|
# Configuration for hub MQTT
|
||||||
|
HUB_MQTT_BROKER_HOST = os.environ.get("HUB_MQTT_BROKER_HOST") or "localhost"
|
||||||
|
HUB_MQTT_BROKER_PORT = try_parse_int(os.environ.get("HUB_MQTT_BROKER_PORT")) or 1883
|
||||||
|
HUB_MQTT_TOPIC = os.environ.get("HUB_MQTT_TOPIC") or "processed_agent_data_topic"
|
||||||
|
|
||||||
|
# Configuration for the Hub
|
||||||
|
HUB_HOST = os.environ.get("HUB_HOST") or "localhost"
|
||||||
|
HUB_PORT = try_parse_int(os.environ.get("HUB_PORT")) or 12000
|
||||||
|
HUB_URL = f"http://{HUB_HOST}:{HUB_PORT}"
|
||||||
@@ -0,0 +1,49 @@
|
|||||||
|
version: "3.9"
|
||||||
|
# name: "road_vision"
|
||||||
|
services:
|
||||||
|
mqtt:
|
||||||
|
image: eclipse-mosquitto
|
||||||
|
container_name: mqtt
|
||||||
|
volumes:
|
||||||
|
- ./mosquitto:/mosquitto
|
||||||
|
- ./mosquitto/data:/mosquitto/data
|
||||||
|
- ./mosquitto/log:/mosquitto/log
|
||||||
|
ports:
|
||||||
|
- 1883:1883
|
||||||
|
- 19001:9001
|
||||||
|
networks:
|
||||||
|
mqtt_network:
|
||||||
|
user: 1000:1000
|
||||||
|
|
||||||
|
|
||||||
|
edge:
|
||||||
|
container_name: edge
|
||||||
|
build: ../
|
||||||
|
depends_on:
|
||||||
|
- mqtt
|
||||||
|
environment:
|
||||||
|
MQTT_BROKER_HOST: "mqtt"
|
||||||
|
MQTT_BROKER_PORT: 1883
|
||||||
|
MQTT_TOPIC: " "
|
||||||
|
HUB_HOST: "store"
|
||||||
|
HUB_PORT: 8000
|
||||||
|
HUB_MQTT_BROKER_HOST: "mqtt"
|
||||||
|
HUB_MQTT_BROKER_PORT: 1883
|
||||||
|
HUB_MQTT_TOPIC: "processed_data_topic"
|
||||||
|
networks:
|
||||||
|
mqtt_network:
|
||||||
|
edge_hub:
|
||||||
|
|
||||||
|
|
||||||
|
networks:
|
||||||
|
mqtt_network:
|
||||||
|
db_network:
|
||||||
|
edge_hub:
|
||||||
|
hub:
|
||||||
|
hub_store:
|
||||||
|
hub_redis:
|
||||||
|
|
||||||
|
|
||||||
|
volumes:
|
||||||
|
postgres_data:
|
||||||
|
pgadmin-data:
|
||||||
@@ -0,0 +1,11 @@
|
|||||||
|
persistence true
|
||||||
|
persistence_location /mosquitto/data/
|
||||||
|
listener 1883
|
||||||
|
## Authentication ##
|
||||||
|
allow_anonymous true
|
||||||
|
# allow_anonymous false
|
||||||
|
# password_file /mosquitto/config/password.txt
|
||||||
|
## Log ##
|
||||||
|
log_dest file /mosquitto/log/mosquitto.log
|
||||||
|
log_dest stdout
|
||||||
|
# listener 1883
|
||||||
@@ -0,0 +1,51 @@
|
|||||||
|
import logging
|
||||||
|
from app.adapters.agent_mqtt_adapter import AgentMQTTAdapter
|
||||||
|
from app.adapters.hub_http_adapter import HubHttpAdapter
|
||||||
|
from app.adapters.hub_mqtt_adapter import HubMqttAdapter
|
||||||
|
from config import (
|
||||||
|
MQTT_BROKER_HOST,
|
||||||
|
MQTT_BROKER_PORT,
|
||||||
|
MQTT_TOPIC,
|
||||||
|
HUB_URL,
|
||||||
|
HUB_MQTT_BROKER_HOST,
|
||||||
|
HUB_MQTT_BROKER_PORT,
|
||||||
|
HUB_MQTT_TOPIC,
|
||||||
|
)
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
# Configure logging settings
|
||||||
|
logging.basicConfig(
|
||||||
|
level=logging.INFO, # Set the log level to INFO (you can use logging.DEBUG for more detailed logs)
|
||||||
|
format="[%(asctime)s] [%(levelname)s] [%(module)s] %(message)s",
|
||||||
|
handlers=[
|
||||||
|
logging.StreamHandler(), # Output log messages to the console
|
||||||
|
logging.FileHandler("app.log"), # Save log messages to a file
|
||||||
|
],
|
||||||
|
)
|
||||||
|
# Create an instance of the StoreApiAdapter using the configuration
|
||||||
|
# hub_adapter = HubHttpAdapter(
|
||||||
|
# api_base_url=HUB_URL,
|
||||||
|
# )
|
||||||
|
hub_adapter = HubMqttAdapter(
|
||||||
|
broker=HUB_MQTT_BROKER_HOST,
|
||||||
|
port=HUB_MQTT_BROKER_PORT,
|
||||||
|
topic=HUB_MQTT_TOPIC,
|
||||||
|
)
|
||||||
|
# Create an instance of the AgentMQTTAdapter using the configuration
|
||||||
|
agent_adapter = AgentMQTTAdapter(
|
||||||
|
broker_host=MQTT_BROKER_HOST,
|
||||||
|
broker_port=MQTT_BROKER_PORT,
|
||||||
|
topic=MQTT_TOPIC,
|
||||||
|
hub_gateway=hub_adapter,
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
# Connect to the MQTT broker and start listening for messages
|
||||||
|
agent_adapter.connect()
|
||||||
|
agent_adapter.start()
|
||||||
|
# Keep the system running indefinitely (you can add other logic as needed)
|
||||||
|
while True:
|
||||||
|
pass
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
# Stop the MQTT adapter and exit gracefully if interrupted by the user
|
||||||
|
agent_adapter.stop()
|
||||||
|
logging.info("System stopped.")
|
||||||
@@ -0,0 +1,10 @@
|
|||||||
|
annotated-types==0.6.0
|
||||||
|
certifi==2024.2.2
|
||||||
|
charset-normalizer==3.3.2
|
||||||
|
idna==3.6
|
||||||
|
paho-mqtt==1.6.1
|
||||||
|
pydantic==2.6.1
|
||||||
|
pydantic_core==2.16.2
|
||||||
|
requests==2.31.0
|
||||||
|
typing_extensions==4.9.0
|
||||||
|
urllib3==2.2.0
|
||||||
@@ -0,0 +1,2 @@
|
|||||||
|
venv
|
||||||
|
__pycache__
|
||||||
@@ -0,0 +1,11 @@
|
|||||||
|
# Use the official Python image as the base image
|
||||||
|
FROM python:3.9-slim
|
||||||
|
# Set the working directory inside the container
|
||||||
|
WORKDIR /app
|
||||||
|
# Copy the requirements.txt file and install dependencies
|
||||||
|
COPY requirements.txt .
|
||||||
|
RUN pip install --no-cache-dir -r requirements.txt
|
||||||
|
# Copy the entire application into the container
|
||||||
|
COPY . .
|
||||||
|
# Run the main.py script inside the container when it starts
|
||||||
|
CMD ["uvicorn", "main:app", "--host", "0.0.0.0"]
|
||||||
@@ -0,0 +1,33 @@
|
|||||||
|
# Hub
|
||||||
|
## Instructions for Starting the Project
|
||||||
|
To start the Hub, follow these steps:
|
||||||
|
1. Clone the repository to your local machine:
|
||||||
|
```bash
|
||||||
|
git clone https://github.com/Toolf/hub.git
|
||||||
|
cd hub
|
||||||
|
```
|
||||||
|
2. Create and activate a virtual environment (optional but recommended):
|
||||||
|
```bash
|
||||||
|
python -m venv venv
|
||||||
|
source venv/bin/activate # On Windows, use: venv\Scripts\activate
|
||||||
|
```
|
||||||
|
3. Install the project dependencies:
|
||||||
|
```bash
|
||||||
|
pip install -r requirements.txt
|
||||||
|
```
|
||||||
|
4. Run the system:
|
||||||
|
```bash
|
||||||
|
python ./app/main.py
|
||||||
|
```
|
||||||
|
The system will start collecting data from the agent through MQTT and processing it.
|
||||||
|
## Running Tests
|
||||||
|
To run tests for the project, use the following command:
|
||||||
|
```bash
|
||||||
|
python -m unittest discover tests
|
||||||
|
```
|
||||||
|
## Common Commands
|
||||||
|
### 1. Saving Requirements
|
||||||
|
To save the project dependencies to the requirements.txt file:
|
||||||
|
```bash
|
||||||
|
pip freeze > requirements.txt
|
||||||
|
```
|
||||||
@@ -0,0 +1,24 @@
|
|||||||
|
import json
|
||||||
|
import logging
|
||||||
|
from typing import List
|
||||||
|
|
||||||
|
import pydantic_core
|
||||||
|
import requests
|
||||||
|
|
||||||
|
from app.entities.processed_agent_data import ProcessedAgentData
|
||||||
|
from app.interfaces.store_gateway import StoreGateway
|
||||||
|
|
||||||
|
|
||||||
|
class StoreApiAdapter(StoreGateway):
|
||||||
|
def __init__(self, api_base_url):
|
||||||
|
self.api_base_url = api_base_url
|
||||||
|
|
||||||
|
def save_data(self, processed_agent_data_batch: List[ProcessedAgentData]):
|
||||||
|
"""
|
||||||
|
Save the processed road data to the Store API.
|
||||||
|
Parameters:
|
||||||
|
processed_agent_data_batch (dict): Processed road data to be saved.
|
||||||
|
Returns:
|
||||||
|
bool: True if the data is successfully saved, False otherwise.
|
||||||
|
"""
|
||||||
|
# Implement it
|
||||||
@@ -0,0 +1,33 @@
|
|||||||
|
from datetime import datetime
|
||||||
|
from pydantic import BaseModel, field_validator
|
||||||
|
|
||||||
|
|
||||||
|
class AccelerometerData(BaseModel):
|
||||||
|
x: float
|
||||||
|
y: float
|
||||||
|
z: float
|
||||||
|
|
||||||
|
|
||||||
|
class GpsData(BaseModel):
|
||||||
|
latitude: float
|
||||||
|
longitude: float
|
||||||
|
|
||||||
|
|
||||||
|
class AgentData(BaseModel):
|
||||||
|
user_id: int
|
||||||
|
accelerometer: AccelerometerData
|
||||||
|
gps: GpsData
|
||||||
|
timestamp: datetime
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
@field_validator('timestamp', mode='before')
|
||||||
|
def parse_timestamp(cls, value):
|
||||||
|
# Convert the timestamp to a datetime object
|
||||||
|
if isinstance(value, datetime):
|
||||||
|
return value
|
||||||
|
try:
|
||||||
|
return datetime.fromisoformat(value)
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
raise ValueError(
|
||||||
|
"Invalid timestamp format. Expected ISO 8601 format (YYYY-MM-DDTHH:MM:SSZ)."
|
||||||
|
)
|
||||||
@@ -0,0 +1,7 @@
|
|||||||
|
from pydantic import BaseModel
|
||||||
|
from app.entities.agent_data import AgentData
|
||||||
|
|
||||||
|
|
||||||
|
class ProcessedAgentData(BaseModel):
|
||||||
|
road_state: str
|
||||||
|
agent_data: AgentData
|
||||||
@@ -0,0 +1,21 @@
|
|||||||
|
from abc import ABC, abstractmethod
|
||||||
|
from typing import List
|
||||||
|
from app.entities.processed_agent_data import ProcessedAgentData
|
||||||
|
|
||||||
|
|
||||||
|
class StoreGateway(ABC):
|
||||||
|
"""
|
||||||
|
Abstract class representing the Store Gateway interface.
|
||||||
|
All store gateway adapters must implement these methods.
|
||||||
|
"""
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def save_data(self, processed_agent_data_batch: List[ProcessedAgentData]) -> bool:
|
||||||
|
"""
|
||||||
|
Method to save the processed agent data in the database.
|
||||||
|
Parameters:
|
||||||
|
processed_agent_data_batch (ProcessedAgentData): The processed agent data to be saved.
|
||||||
|
Returns:
|
||||||
|
bool: True if the data is successfully saved, False otherwise.
|
||||||
|
"""
|
||||||
|
pass
|
||||||
@@ -0,0 +1,26 @@
|
|||||||
|
import os
|
||||||
|
|
||||||
|
|
||||||
|
def try_parse_int(value: str):
|
||||||
|
try:
|
||||||
|
return int(value)
|
||||||
|
except Exception:
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
# Configuration for the Store API
|
||||||
|
STORE_API_HOST = os.environ.get("STORE_API_HOST") or "localhost"
|
||||||
|
STORE_API_PORT = try_parse_int(os.environ.get("STORE_API_PORT")) or 8000
|
||||||
|
STORE_API_BASE_URL = f"http://{STORE_API_HOST}:{STORE_API_PORT}"
|
||||||
|
|
||||||
|
# Configure for Redis
|
||||||
|
REDIS_HOST = os.environ.get("REDIS_HOST") or "localhost"
|
||||||
|
REDIS_PORT = try_parse_int(os.environ.get("REDIS_PORT")) or 6379
|
||||||
|
|
||||||
|
# Configure for hub logic
|
||||||
|
BATCH_SIZE = try_parse_int(os.environ.get("BATCH_SIZE")) or 20
|
||||||
|
|
||||||
|
# MQTT
|
||||||
|
MQTT_BROKER_HOST = os.environ.get("MQTT_BROKER_HOST") or "localhost"
|
||||||
|
MQTT_BROKER_PORT = try_parse_int(os.environ.get("MQTT_BROKER_PORT")) or 1883
|
||||||
|
MQTT_TOPIC = os.environ.get("MQTT_TOPIC") or "processed_agent_data_topic"
|
||||||
@@ -0,0 +1,11 @@
|
|||||||
|
CREATE TABLE processed_agent_data (
|
||||||
|
id SERIAL PRIMARY KEY,
|
||||||
|
road_state VARCHAR(255) NOT NULL,
|
||||||
|
user_id INTEGER NOT NULL,
|
||||||
|
x FLOAT,
|
||||||
|
y FLOAT,
|
||||||
|
z FLOAT,
|
||||||
|
latitude FLOAT,
|
||||||
|
longitude FLOAT,
|
||||||
|
timestamp TIMESTAMP
|
||||||
|
);
|
||||||
@@ -0,0 +1,111 @@
|
|||||||
|
version: "3.9"
|
||||||
|
name: "road_vision__hub"
|
||||||
|
services:
|
||||||
|
mqtt:
|
||||||
|
image: eclipse-mosquitto
|
||||||
|
container_name: mqtt
|
||||||
|
volumes:
|
||||||
|
- ./mosquitto:/mosquitto
|
||||||
|
- ./mosquitto/data:/mosquitto/data
|
||||||
|
- ./mosquitto/log:/mosquitto/log
|
||||||
|
ports:
|
||||||
|
- 1883:1883
|
||||||
|
- 9001:9001
|
||||||
|
networks:
|
||||||
|
mqtt_network:
|
||||||
|
|
||||||
|
|
||||||
|
postgres_db:
|
||||||
|
image: postgres:latest
|
||||||
|
container_name: postgres_db
|
||||||
|
restart: always
|
||||||
|
environment:
|
||||||
|
POSTGRES_USER: user
|
||||||
|
POSTGRES_PASSWORD: pass
|
||||||
|
POSTGRES_DB: test_db
|
||||||
|
volumes:
|
||||||
|
- postgres_data:/var/lib/postgresql/data
|
||||||
|
- ./db/structure.sql:/docker-entrypoint-initdb.d/structure.sql
|
||||||
|
ports:
|
||||||
|
- "5432:5432"
|
||||||
|
networks:
|
||||||
|
db_network:
|
||||||
|
|
||||||
|
|
||||||
|
pgadmin:
|
||||||
|
container_name: pgadmin4
|
||||||
|
image: dpage/pgadmin4
|
||||||
|
restart: always
|
||||||
|
environment:
|
||||||
|
PGADMIN_DEFAULT_EMAIL: admin@admin.com
|
||||||
|
PGADMIN_DEFAULT_PASSWORD: root
|
||||||
|
volumes:
|
||||||
|
- pgadmin-data:/var/lib/pgadmin
|
||||||
|
ports:
|
||||||
|
- "5050:80"
|
||||||
|
networks:
|
||||||
|
db_network:
|
||||||
|
|
||||||
|
|
||||||
|
store:
|
||||||
|
container_name: store
|
||||||
|
build: ../../store
|
||||||
|
depends_on:
|
||||||
|
- postgres_db
|
||||||
|
restart: always
|
||||||
|
environment:
|
||||||
|
POSTGRES_USER: user
|
||||||
|
POSTGRES_PASSWORD: pass
|
||||||
|
POSTGRES_DB: test_db
|
||||||
|
POSTGRES_HOST: postgres_db
|
||||||
|
POSTGRES_PORT: 5432
|
||||||
|
ports:
|
||||||
|
- "8000:8000"
|
||||||
|
networks:
|
||||||
|
db_network:
|
||||||
|
hub_store:
|
||||||
|
|
||||||
|
|
||||||
|
redis:
|
||||||
|
image: redis:latest
|
||||||
|
container_name: redis
|
||||||
|
ports:
|
||||||
|
- "6379:6379"
|
||||||
|
networks:
|
||||||
|
hub_redis:
|
||||||
|
|
||||||
|
|
||||||
|
hub:
|
||||||
|
container_name: hub
|
||||||
|
build: ../
|
||||||
|
depends_on:
|
||||||
|
- mqtt
|
||||||
|
- redis
|
||||||
|
- store
|
||||||
|
environment:
|
||||||
|
STORE_API_HOST: "store"
|
||||||
|
STORE_API_PORT: 8000
|
||||||
|
REDIS_HOST: "redis"
|
||||||
|
REDIS_PORT: 6379
|
||||||
|
MQTT_BROKER_HOST: "mqtt"
|
||||||
|
MQTT_BROKER_PORT: 1883
|
||||||
|
MQTT_TOPIC: "processed_data_topic"
|
||||||
|
BATCH_SIZE: 1
|
||||||
|
ports:
|
||||||
|
- "9000:8000"
|
||||||
|
networks:
|
||||||
|
mqtt_network:
|
||||||
|
hub_store:
|
||||||
|
hub_redis:
|
||||||
|
|
||||||
|
|
||||||
|
networks:
|
||||||
|
mqtt_network:
|
||||||
|
db_network:
|
||||||
|
hub_store:
|
||||||
|
hub_redis:
|
||||||
|
|
||||||
|
|
||||||
|
volumes:
|
||||||
|
postgres_data:
|
||||||
|
pgadmin-data:
|
||||||
@@ -0,0 +1,11 @@
|
|||||||
|
persistence true
|
||||||
|
persistence_location /mosquitto/data/
|
||||||
|
listener 1883
|
||||||
|
## Authentication ##
|
||||||
|
allow_anonymous true
|
||||||
|
# allow_anonymous false
|
||||||
|
# password_file /mosquitto/config/password.txt
|
||||||
|
## Log ##
|
||||||
|
log_dest file /mosquitto/log/mosquitto.log
|
||||||
|
log_dest stdout
|
||||||
|
# listener 1883
|
||||||
+96
@@ -0,0 +1,96 @@
|
|||||||
|
import logging
|
||||||
|
from typing import List
|
||||||
|
|
||||||
|
from fastapi import FastAPI
|
||||||
|
from redis import Redis
|
||||||
|
import paho.mqtt.client as mqtt
|
||||||
|
|
||||||
|
from app.adapters.store_api_adapter import StoreApiAdapter
|
||||||
|
from app.entities.processed_agent_data import ProcessedAgentData
|
||||||
|
from config import (
|
||||||
|
STORE_API_BASE_URL,
|
||||||
|
REDIS_HOST,
|
||||||
|
REDIS_PORT,
|
||||||
|
BATCH_SIZE,
|
||||||
|
MQTT_TOPIC,
|
||||||
|
MQTT_BROKER_HOST,
|
||||||
|
MQTT_BROKER_PORT,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Configure logging settings
|
||||||
|
logging.basicConfig(
|
||||||
|
level=logging.INFO, # Set the log level to INFO (you can use logging.DEBUG for more detailed logs)
|
||||||
|
format="[%(asctime)s] [%(levelname)s] [%(module)s] %(message)s",
|
||||||
|
handlers=[
|
||||||
|
logging.StreamHandler(), # Output log messages to the console
|
||||||
|
logging.FileHandler("app.log"), # Save log messages to a file
|
||||||
|
],
|
||||||
|
)
|
||||||
|
# Create an instance of the Redis using the configuration
|
||||||
|
redis_client = Redis(host=REDIS_HOST, port=REDIS_PORT)
|
||||||
|
# Create an instance of the StoreApiAdapter using the configuration
|
||||||
|
store_adapter = StoreApiAdapter(api_base_url=STORE_API_BASE_URL)
|
||||||
|
# Create an instance of the AgentMQTTAdapter using the configuration
|
||||||
|
|
||||||
|
# FastAPI
|
||||||
|
app = FastAPI()
|
||||||
|
|
||||||
|
|
||||||
|
@app.post("/processed_agent_data/")
|
||||||
|
async def save_processed_agent_data(processed_agent_data: ProcessedAgentData):
|
||||||
|
redis_client.lpush("processed_agent_data", processed_agent_data.model_dump_json())
|
||||||
|
if redis_client.llen("processed_agent_data") >= BATCH_SIZE:
|
||||||
|
processed_agent_data_batch: List[ProcessedAgentData] = []
|
||||||
|
for _ in range(BATCH_SIZE):
|
||||||
|
processed_agent_data = ProcessedAgentData.model_validate_json(
|
||||||
|
redis_client.lpop("processed_agent_data")
|
||||||
|
)
|
||||||
|
processed_agent_data_batch.append(processed_agent_data)
|
||||||
|
print(processed_agent_data_batch)
|
||||||
|
store_adapter.save_data(processed_agent_data_batch=processed_agent_data_batch)
|
||||||
|
return {"status": "ok"}
|
||||||
|
|
||||||
|
|
||||||
|
# MQTT
|
||||||
|
client = mqtt.Client()
|
||||||
|
|
||||||
|
|
||||||
|
def on_connect(client, userdata, flags, rc):
|
||||||
|
if rc == 0:
|
||||||
|
logging.info("Connected to MQTT broker")
|
||||||
|
client.subscribe(MQTT_TOPIC)
|
||||||
|
else:
|
||||||
|
logging.info(f"Failed to connect to MQTT broker with code: {rc}")
|
||||||
|
|
||||||
|
|
||||||
|
def on_message(client, userdata, msg):
|
||||||
|
try:
|
||||||
|
payload: str = msg.payload.decode("utf-8")
|
||||||
|
# Create ProcessedAgentData instance with the received data
|
||||||
|
processed_agent_data = ProcessedAgentData.model_validate_json(
|
||||||
|
payload, strict=True
|
||||||
|
)
|
||||||
|
|
||||||
|
redis_client.lpush(
|
||||||
|
"processed_agent_data", processed_agent_data.model_dump_json()
|
||||||
|
)
|
||||||
|
processed_agent_data_batch: List[ProcessedAgentData] = []
|
||||||
|
if redis_client.llen("processed_agent_data") >= BATCH_SIZE:
|
||||||
|
for _ in range(BATCH_SIZE):
|
||||||
|
processed_agent_data = ProcessedAgentData.model_validate_json(
|
||||||
|
redis_client.lpop("processed_agent_data")
|
||||||
|
)
|
||||||
|
processed_agent_data_batch.append(processed_agent_data)
|
||||||
|
store_adapter.save_data(processed_agent_data_batch=processed_agent_data_batch)
|
||||||
|
return {"status": "ok"}
|
||||||
|
except Exception as e:
|
||||||
|
logging.info(f"Error processing MQTT message: {e}")
|
||||||
|
|
||||||
|
|
||||||
|
# Connect
|
||||||
|
client.on_connect = on_connect
|
||||||
|
client.on_message = on_message
|
||||||
|
client.connect(MQTT_BROKER_HOST, MQTT_BROKER_PORT)
|
||||||
|
|
||||||
|
# Start
|
||||||
|
client.loop_start()
|
||||||
Binary file not shown.
@@ -0,0 +1,60 @@
|
|||||||
|
import unittest
|
||||||
|
from unittest.mock import Mock
|
||||||
|
import redis
|
||||||
|
from app.adapters.agent_mqtt_adapter import AgentMQTTAdapter
|
||||||
|
from app.interfaces.store_gateway import StoreGateway
|
||||||
|
from app.entities.agent_data import AccelerometerData, AgentData, GpsData
|
||||||
|
from app.usecases.data_processing import process_agent_data_batch
|
||||||
|
|
||||||
|
class TestAgentMQTTAdapter(unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
# Create a mock StoreGateway for testing
|
||||||
|
self.mock_store_gateway = Mock(spec=StoreGateway)
|
||||||
|
self.mock_redis = Mock(spec=redis.Redis)
|
||||||
|
# Create the AgentMQTTAdapter instance with the mock StoreGateway
|
||||||
|
self.agent_adapter = AgentMQTTAdapter(
|
||||||
|
broker_host="test_broker",
|
||||||
|
broker_port=1234,
|
||||||
|
topic="test_topic",
|
||||||
|
store_gateway=self.mock_store_gateway,
|
||||||
|
redis_client=self.mock_redis,
|
||||||
|
batch_size=1,
|
||||||
|
)
|
||||||
|
def test_on_message_valid_data(self):
|
||||||
|
# Test handling of valid incoming MQTT message
|
||||||
|
# (Assuming data is in the correct JSON format)
|
||||||
|
valid_json_data = '{"user_id": 1,"accelerometer": {"x": 0.1, "y": 0.2, "z": 0.3}, "gps": {"latitude": 10.123, "longitude": 20.456}, "timestamp": "2023-07-21T12:34:56Z"}'
|
||||||
|
mock_msg = Mock(payload=valid_json_data.encode("utf-8"))
|
||||||
|
self.mock_redis.llen.return_value = 1
|
||||||
|
self.mock_redis.rpop.return_value = valid_json_data
|
||||||
|
# Call on_message with the mock message
|
||||||
|
self.agent_adapter.on_message(None, None, mock_msg)
|
||||||
|
# Ensure that the store_gateway's save_data method is called once with the correct arguments
|
||||||
|
expected_agent_data = AgentData(
|
||||||
|
user_id=1,
|
||||||
|
accelerometer=AccelerometerData(
|
||||||
|
x=0.1,
|
||||||
|
y=0.2,
|
||||||
|
z=0.3,
|
||||||
|
),
|
||||||
|
gps=GpsData(
|
||||||
|
latitude=10.123,
|
||||||
|
longitude=20.456,
|
||||||
|
),
|
||||||
|
timestamp="2023-07-21T12:34:56Z",
|
||||||
|
)
|
||||||
|
self.mock_store_gateway.save_data.assert_called_once_with(
|
||||||
|
process_agent_data_batch([expected_agent_data])
|
||||||
|
)
|
||||||
|
def test_on_message_invalid_data(self):
|
||||||
|
# Test handling of invalid incoming MQTT message
|
||||||
|
# (Assuming data is missing required fields or has incorrect format)
|
||||||
|
invalid_json_data = '{"user_id": 1, "accelerometer": {"x": 0.1, "y": 0.2}, "gps": {"latitude": 10.123}, "timestamp": 12345}'
|
||||||
|
mock_msg = Mock(payload=invalid_json_data.encode("utf-8"))
|
||||||
|
# Call on_message with the mock message
|
||||||
|
self.agent_adapter.on_message(None, None, mock_msg)
|
||||||
|
# Ensure that the store_gateway's save_data method is not called (due to invalid data)
|
||||||
|
self.mock_store_gateway.save_data.assert_not_called()
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
||||||
@@ -0,0 +1,72 @@
|
|||||||
|
import requests
|
||||||
|
import unittest
|
||||||
|
from unittest.mock import Mock, patch
|
||||||
|
from app.adapters.store_api_adapter import StoreApiAdapter
|
||||||
|
from app.entities.agent_data import AccelerometerData, AgentData, GpsData
|
||||||
|
from app.entities.processed_agent_data import ProcessedAgentData
|
||||||
|
|
||||||
|
class TestStoreApiAdapter(unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
# Create the StoreApiAdapter instance
|
||||||
|
self.store_api_adapter = StoreApiAdapter(api_base_url="http://test-api.com")
|
||||||
|
@patch.object(requests, "post")
|
||||||
|
def test_save_data_success(self, mock_post):
|
||||||
|
# Test successful saving of data to the Store API
|
||||||
|
# Sample processed road data
|
||||||
|
agent_data = AgentData(
|
||||||
|
user_id=1,
|
||||||
|
accelerometer=AccelerometerData(
|
||||||
|
x=0.1,
|
||||||
|
y=0.2,
|
||||||
|
z=0.3,
|
||||||
|
),
|
||||||
|
gps=GpsData(
|
||||||
|
latitude=10.123,
|
||||||
|
longitude=20.456,
|
||||||
|
),
|
||||||
|
timestamp="2023-07-21T12:34:56Z",
|
||||||
|
)
|
||||||
|
processed_data = ProcessedAgentData(road_state="normal", agent_data=agent_data)
|
||||||
|
# Mock the response from the Store API
|
||||||
|
mock_response = Mock(status_code=201) # 201 indicates successful creation
|
||||||
|
mock_post.return_value = mock_response
|
||||||
|
# Call the save_data method
|
||||||
|
result = self.store_api_adapter.save_data(processed_data)
|
||||||
|
# Ensure that the post method of the mock is called with the correct arguments
|
||||||
|
mock_post.assert_called_once_with(
|
||||||
|
"http://test-api.com/agent_data", json=processed_data.model_dump()
|
||||||
|
)
|
||||||
|
# Ensure that the result is True, indicating successful saving
|
||||||
|
self.assertTrue(result)
|
||||||
|
@patch.object(requests, "post")
|
||||||
|
def test_save_data_failure(self, mock_post):
|
||||||
|
# Test failure to save data to the Store API
|
||||||
|
# Sample processed road data
|
||||||
|
agent_data = AgentData(
|
||||||
|
user_id=1,
|
||||||
|
accelerometer=AccelerometerData(
|
||||||
|
x=0.1,
|
||||||
|
y=0.2,
|
||||||
|
z=0.3,
|
||||||
|
),
|
||||||
|
gps=GpsData(
|
||||||
|
latitude=10.123,
|
||||||
|
longitude=20.456,
|
||||||
|
),
|
||||||
|
timestamp="2023-07-21T12:34:56Z",
|
||||||
|
)
|
||||||
|
processed_data = ProcessedAgentData(road_state="normal", agent_data=agent_data)
|
||||||
|
# Mock the response from the Store API
|
||||||
|
mock_response = Mock(status_code=400) # 400 indicates a client error
|
||||||
|
mock_post.return_value = mock_response
|
||||||
|
# Call the save_data method
|
||||||
|
result = self.store_api_adapter.save_data(processed_data)
|
||||||
|
# Ensure that the post method of the mock is called with the correct arguments
|
||||||
|
mock_post.assert_called_once_with(
|
||||||
|
"http://test-api.com/agent_data", json=processed_data.model_dump()
|
||||||
|
)
|
||||||
|
# Ensure that the result is False, indicating failure to save
|
||||||
|
self.assertFalse(result)
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
||||||
Reference in New Issue
Block a user