Compare commits

...

25 Commits

Author SHA1 Message Date
db63eb6d79 [P] Improve logging logic in agent
All checks were successful
Component testing / Hub testing (push) Successful in 24s
Component testing / Store testing (push) Successful in 27s
Component testing / Integration smoke testing (push) Successful in 2m25s
2026-03-25 15:40:07 +02:00
77d6968297 [P] Fix map lat, lon confusion
All checks were successful
Component testing / Hub testing (push) Successful in 23s
Component testing / Store testing (push) Successful in 18s
Component testing / Integration smoke testing (push) Successful in 2m22s
2026-03-25 14:57:38 +02:00
0c2392dc0b Merge remote-tracking branch 'github/dev' into dev
All checks were successful
Component testing / Hub testing (push) Successful in 20s
Component testing / Store testing (push) Successful in 22s
Component testing / Integration smoke testing (push) Successful in 2m18s
2026-03-25 13:12:18 +02:00
VladiusVostokus
65f767d38e Merge pull request #29 from Rhinemann/lab4/yushchenko-SCRUM-81-DataProcessing-implementation
[L4] SCRUM-81: implementation data processing function
2026-03-25 11:11:31 +00:00
0695e3d092 [P] Use state machine approach to determine road condition 2026-03-25 12:16:34 +02:00
AndriiJushchenko
d6e094e6c0 fix: delay logic in DP 2026-03-25 12:16:34 +02:00
AndriiJushchenko
2167eb2960 DP implementation + delay 2026-03-25 12:16:34 +02:00
VladiusVostokus
38374a6723 Merge pull request #28 from Rhinemann/lab4/shmuliar-FIX-01-wrong-acceleration-data-types
[P] Fix acceleration data types
2026-03-25 10:14:10 +00:00
c08612f71a [P] Fix acceleration data types 2026-03-25 10:10:29 +02:00
bde51ca5e1 [P] Fix Store -> MapView websocket incompatibility
All checks were successful
Component testing / Hub testing (push) Successful in 21s
Component testing / Store testing (push) Successful in 20s
Component testing / Integration smoke testing (push) Successful in 2m19s
2026-03-24 16:57:29 +02:00
a204bb1676 [P] Split rows into websocket-sendable messages 2026-03-24 16:38:16 +02:00
764fb77f27 Merge from upstream 'github/dev' into dev
All checks were successful
Component testing / Hub testing (push) Successful in 21s
Component testing / Store testing (push) Successful in 18s
Component testing / Integration smoke testing (push) Successful in 2m14s
2026-03-24 15:21:56 +02:00
VladiusVostokus
a55fc17711 Merge pull request #27 from Rhinemann/lab4/hrynko-SCRUM-80-hubadapter
implement Edge-Hub integration with user_id validation (SCRUM-93, SCRUM-94)
2026-03-24 13:20:39 +00:00
b34e385128 [L4] Remove irrelevant environment variables 2026-03-24 14:42:13 +02:00
a8a0ef5e15 [L4] Remove unused import 2026-03-24 14:09:53 +02:00
00b037a243 [L4] Remove uncessecary environment variable left over after print -> logging usage switch 2026-03-24 14:07:49 +02:00
d1b6c0eed1 [L4] Fix logging level on error message 2026-03-24 14:03:29 +02:00
5e890d4f03 [L4] Remove obvious single code line comments to reduce risk of misleading comments in the future 2026-03-24 14:02:03 +02:00
a8e50d0386 [L4] Remove excessive library import and clean up edge codebase 2026-03-24 13:58:32 +02:00
1b42be264d [L4] Remove misleading batch_size parameter from AgentMQTTAdapter 2026-03-24 13:37:33 +02:00
esk4nz
b12bdc334c fix: improve logging clarity and ensure data delivery in AgentMQTTAdapter 2026-03-23 23:41:25 +02:00
esk4nz
e8ff1c6cbd Replaced busy-wait loop with threading.Event to fix 100% CPU load 2026-03-23 23:28:17 +02:00
esk4nz
ad70519f47 SCRUM-80
Removed manual user_id fallback in data processing
2026-03-23 23:00:19 +02:00
esk4nz
b10aec1020 SCRUM-80
Changed print to logging in agent main
2026-03-23 22:57:15 +02:00
esk4nz
c085a49c8c implement Edge-Hub integration with user_id validation (SCRUM-93, SCRUM-94)
- Agent: Updated config and main
- Edge: Implemented adapter factory in main.py to switch between MQTT and HTTP.
- Edge: Updated AgentData entity and processing logic to support user_id.
- Infrastructure: Configured docker-compose for dynamic protocol switching and environment management.
2026-03-23 21:31:31 +02:00
16 changed files with 112 additions and 99 deletions

View File

@@ -72,8 +72,8 @@ class Datasource:
) )
new_points = [ new_points = [
( (
processed_agent_data.latitude,
processed_agent_data.longitude, processed_agent_data.longitude,
processed_agent_data.latitude,
processed_agent_data.road_state, processed_agent_data.road_state,
processed_agent_data.user_id processed_agent_data.user_id
) )

View File

@@ -2,6 +2,6 @@ from dataclasses import dataclass
@dataclass @dataclass
class Accelerometer: class Accelerometer:
x: int x: float
y: int y: float
z: int z: float

View File

@@ -32,7 +32,7 @@ class MapViewApp(App):
Встановлює необхідні маркери, викликає функцію для оновлення мапи Встановлює необхідні маркери, викликає функцію для оновлення мапи
""" """
self.update() self.update()
Clock.schedule_interval(self.update, 5) Clock.schedule_interval(self.update, 0.1)
def update(self, *args): def update(self, *args):
""" """
@@ -87,7 +87,8 @@ class MapViewApp(App):
self.car_markers[user_id].lat = lat self.car_markers[user_id].lat = lat
self.car_markers[user_id].lon = lon self.car_markers[user_id].lon = lon
self.mapview.center_on(lat, lon) if user_id == 1:
self.mapview.center_on(lat, lon)
def set_pothole_marker(self, point): def set_pothole_marker(self, point):
if isinstance(point, dict): if isinstance(point, dict):

View File

@@ -8,7 +8,7 @@ def try_parse(type, value: str):
return None return None
USER_ID = 1 USER_ID = try_parse(int, os.environ.get("USER_ID")) or 1
# MQTT config # MQTT config
MQTT_BROKER_HOST = os.environ.get("MQTT_BROKER_HOST") or "mqtt" MQTT_BROKER_HOST = os.environ.get("MQTT_BROKER_HOST") or "mqtt"
MQTT_BROKER_PORT = try_parse(int, os.environ.get("MQTT_BROKER_PORT")) or 1883 MQTT_BROKER_PORT = try_parse(int, os.environ.get("MQTT_BROKER_PORT")) or 1883

View File

@@ -3,6 +3,6 @@ from dataclasses import dataclass
@dataclass @dataclass
class Accelerometer: class Accelerometer:
x: int x: float
y: int y: float
z: int z: float

View File

@@ -15,6 +15,7 @@ class FileDatasource:
def __init__( def __init__(
self, self,
acc_divisor: float,
accelerometer_filename: str, accelerometer_filename: str,
gps_filename: str, gps_filename: str,
park_filename: str, park_filename: str,
@@ -34,6 +35,8 @@ class FileDatasource:
self._started = False self._started = False
self.acc_divisor = acc_divisor
def startReading(self, *args, **kwargs): def startReading(self, *args, **kwargs):
"""Must be called before read()""" """Must be called before read()"""
if self._started: if self._started:
@@ -160,15 +163,14 @@ class FileDatasource:
return row return row
@staticmethod def _parse_acc(self, row: List[str]) -> Accelerometer:
def _parse_acc(row: List[str]) -> Accelerometer:
if len(row) < 3: if len(row) < 3:
raise ValueError(f"Accelerometer row must have 3 values (x,y,z). Got: {row}") raise ValueError(f"Accelerometer row must have 3 values (x,y,z). Got: {row}")
try: try:
x = int(row[0]) x = int(row[0]) / self.acc_divisor
y = int(row[1]) y = int(row[1]) / self.acc_divisor
z = int(row[2]) z = int(row[2]) / self.acc_divisor
except ValueError as e: except ValueError as e:
raise ValueError(f"Invalid accelerometer values (expected integers): {row}") from e raise ValueError(f"Invalid accelerometer values (expected integers): {row}") from e

View File

@@ -1,6 +1,7 @@
from paho.mqtt import client as mqtt_client from paho.mqtt import client as mqtt_client
from schema.aggregated_data_schema import AggregatedDataSchema from schema.aggregated_data_schema import AggregatedDataSchema
from file_datasource import FileDatasource from file_datasource import FileDatasource
import logging
import config import config
@@ -15,6 +16,8 @@ def connect_mqtt(broker, port):
print("Failed to connect {broker}:{port}, return code %d\n", rc) print("Failed to connect {broker}:{port}, return code %d\n", rc)
exit(rc) # Stop execution exit(rc) # Stop execution
logging.info(f"Acting as USER_ID = {config.USER_ID}")
client = mqtt_client.Client() client = mqtt_client.Client()
client.on_connect = on_connect client.on_connect = on_connect
client.connect(broker, port) client.connect(broker, port)
@@ -28,16 +31,18 @@ def publish(client, topic, datasource):
data = datasource.read() data = datasource.read()
msg = AggregatedDataSchema().dumps(data) msg = AggregatedDataSchema().dumps(data)
result = client.publish(topic, msg) result = client.publish(topic, msg)
logging.debug(f"Published to {topic}: {msg[:50]}...")
status = result[0] status = result[0]
if status != 0: if status != 0:
print(f"Failed to send message to topic {topic}") logging.error(f"Failed to send message to topic {topic}")
def run(): def run():
logging.basicConfig(level = logging.INFO)
# Prepare mqtt client # Prepare mqtt client
client = connect_mqtt(config.MQTT_BROKER_HOST, config.MQTT_BROKER_PORT) client = connect_mqtt(config.MQTT_BROKER_HOST, config.MQTT_BROKER_PORT)
# Prepare datasource # Prepare datasource
datasource = FileDatasource("data/accelerometer.csv", "data/gps.csv", "data/parking.csv") datasource = FileDatasource(16384.0, "data/accelerometer.csv", "data/gps.csv", "data/parking.csv")
# Infinity publish data # Infinity publish data
publish(client, config.MQTT_TOPIC, datasource) publish(client, config.MQTT_TOPIC, datasource)

View File

@@ -2,6 +2,6 @@ from marshmallow import Schema, fields
class AccelerometerSchema(Schema): class AccelerometerSchema(Schema):
x = fields.Int() x = fields.Float()
y = fields.Int() y = fields.Float()
z = fields.Int() z = fields.Float()

View File

@@ -22,6 +22,7 @@ services:
depends_on: depends_on:
- mqtt - mqtt
environment: environment:
PYTHONUNBUFFERED: 1
MQTT_BROKER_HOST: "mqtt" MQTT_BROKER_HOST: "mqtt"
MQTT_BROKER_PORT: 1883 MQTT_BROKER_PORT: 1883
MQTT_TOPIC: "agent_data_topic" MQTT_TOPIC: "agent_data_topic"
@@ -39,9 +40,10 @@ services:
environment: environment:
MQTT_BROKER_HOST: "mqtt" MQTT_BROKER_HOST: "mqtt"
MQTT_BROKER_PORT: 1883 MQTT_BROKER_PORT: 1883
MQTT_TOPIC: " " MQTT_TOPIC: "agent_data_topic"
HUB_HOST: "store" HUB_HOST: "hub"
HUB_PORT: 8000 HUB_PORT: 8000
HUB_CONNECTION_TYPE: "http"
HUB_MQTT_BROKER_HOST: "mqtt" HUB_MQTT_BROKER_HOST: "mqtt"
HUB_MQTT_BROKER_PORT: 1883 HUB_MQTT_BROKER_PORT: 1883
HUB_MQTT_TOPIC: "processed_data_topic" HUB_MQTT_TOPIC: "processed_data_topic"
@@ -89,6 +91,7 @@ services:
- postgres_db - postgres_db
restart: always restart: always
environment: environment:
PYTHONUNBUFFERED: 1
POSTGRES_USER: user POSTGRES_USER: user
POSTGRES_PASSWORD: pass POSTGRES_PASSWORD: pass
POSTGRES_DB: test_db POSTGRES_DB: test_db
@@ -120,6 +123,7 @@ services:
- redis - redis
- store - store
environment: environment:
PYTHONUNBUFFERED: 1
STORE_API_HOST: "store" STORE_API_HOST: "store"
STORE_API_PORT: 8000 STORE_API_PORT: 8000
REDIS_HOST: "redis" REDIS_HOST: "redis"

View File

@@ -13,9 +13,7 @@ class AgentMQTTAdapter(AgentGateway):
broker_port, broker_port,
topic, topic,
hub_gateway: HubGateway, hub_gateway: HubGateway,
batch_size=10,
): ):
self.batch_size = batch_size
# MQTT # MQTT
self.broker_host = broker_host self.broker_host = broker_host
self.broker_port = broker_port self.broker_port = broker_port
@@ -35,42 +33,21 @@ class AgentMQTTAdapter(AgentGateway):
"""Processing agent data and sent it to hub gateway""" """Processing agent data and sent it to hub gateway"""
try: try:
payload: str = msg.payload.decode("utf-8") payload: str = msg.payload.decode("utf-8")
# Create AgentData instance with the received data
agent_data = AgentData.model_validate_json(payload, strict=True) agent_data = AgentData.model_validate_json(payload, strict=True)
# Process the received data (you can call a use case here if needed)
processed_data = process_agent_data(agent_data) processed_data = process_agent_data(agent_data)
# Store the agent_data in the database (you can send it to the data processing module)
if not self.hub_gateway.save_data(processed_data): if self.hub_gateway.save_data(processed_data):
logging.error("Hub is not available") logging.info("Processed data successfully forwarded to the Hub.")
else:
logging.error("Failed to send data: Hub gateway is unavailable.")
except Exception as e: except Exception as e:
logging.info(f"Error processing MQTT message: {e}") logging.error(f"Error processing MQTT message: {e}")
def connect(self): def connect(self):
self.client.on_connect = self.on_connect self.client.on_connect = self.on_connect
self.client.on_message = self.on_message self.client.on_message = self.on_message
self.client.connect(self.broker_host, self.broker_port, 60) self.client.connect(self.broker_host, self.broker_port, 60)
def start(self): def loop_forever(self):
self.client.loop_start() self.client.loop_forever()
def stop(self):
self.client.loop_stop()
# Usage example:
if __name__ == "__main__":
broker_host = "localhost"
broker_port = 1883
topic = "agent_data_topic"
# Assuming you have implemented the StoreGateway and passed it to the adapter
store_gateway = HubGateway()
adapter = AgentMQTTAdapter(broker_host, broker_port, topic, store_gateway)
adapter.connect()
adapter.start()
try:
# Keep the adapter running in the background
while True:
pass
except KeyboardInterrupt:
adapter.stop()
logging.info("Adapter stopped.")

View File

@@ -14,6 +14,7 @@ class GpsData(BaseModel):
class AgentData(BaseModel): class AgentData(BaseModel):
user_id: int
accelerometer: AccelerometerData accelerometer: AccelerometerData
gps: GpsData gps: GpsData
timestamp: datetime timestamp: datetime

View File

@@ -26,15 +26,8 @@ class AgentGateway(ABC):
pass pass
@abstractmethod @abstractmethod
def start(self): def loop_forever(self):
""" """
Method to start listening for messages from the agent. Method to await for new messages.
"""
pass
@abstractmethod
def stop(self):
"""
Method to stop the agent gateway and clean up resources.
""" """
pass pass

View File

@@ -1,6 +1,7 @@
from app.entities.agent_data import AgentData from app.entities.agent_data import AgentData
from app.entities.processed_agent_data import ProcessedAgentData from app.entities.processed_agent_data import ProcessedAgentData
_last_detection_state = {}
def process_agent_data( def process_agent_data(
agent_data: AgentData, agent_data: AgentData,
@@ -12,4 +13,24 @@ def process_agent_data(
Returns: Returns:
processed_data_batch (ProcessedAgentData): Processed data containing the classified state of the road surface and agent data. processed_data_batch (ProcessedAgentData): Processed data containing the classified state of the road surface and agent data.
""" """
# Implement it user_id = agent_data.user_id
road_state = "normal"
last_detection_state = _last_detection_state.get(user_id, False)
if (agent_data.accelerometer.z < 0.6):
road_state = "pothole"
elif (agent_data.accelerometer.z > 1.2):
road_state = "bump"
detection_happened = road_state != "normal"
if not (not last_detection_state and detection_happened):
road_state = "normal"
_last_detection_state[user_id] = detection_happened
return ProcessedAgentData(
road_state=road_state,
agent_data=agent_data
)

View File

@@ -16,9 +16,12 @@ MQTT_TOPIC = os.environ.get("MQTT_TOPIC") or "agent_data_topic"
# Configuration for hub MQTT # Configuration for hub MQTT
HUB_MQTT_BROKER_HOST = os.environ.get("HUB_MQTT_BROKER_HOST") or "localhost" HUB_MQTT_BROKER_HOST = os.environ.get("HUB_MQTT_BROKER_HOST") or "localhost"
HUB_MQTT_BROKER_PORT = try_parse_int(os.environ.get("HUB_MQTT_BROKER_PORT")) or 1883 HUB_MQTT_BROKER_PORT = try_parse_int(os.environ.get("HUB_MQTT_BROKER_PORT")) or 1883
HUB_MQTT_TOPIC = os.environ.get("HUB_MQTT_TOPIC") or "processed_agent_data_topic" HUB_MQTT_TOPIC = os.environ.get("HUB_MQTT_TOPIC") or "processed_data_topic"
# Configuration for the Hub # Configuration for the Hub
HUB_HOST = os.environ.get("HUB_HOST") or "localhost" HUB_HOST = os.environ.get("HUB_HOST") or "localhost"
HUB_PORT = try_parse_int(os.environ.get("HUB_PORT")) or 12000 HUB_PORT = try_parse_int(os.environ.get("HUB_PORT")) or 8000
HUB_URL = f"http://{HUB_HOST}:{HUB_PORT}" HUB_URL = f"http://{HUB_HOST}:{HUB_PORT}"
# For choosing type of connection
HUB_CONNECTION_TYPE = os.environ.get("HUB_CONNECTION_TYPE") or "mqtt"

View File

@@ -10,42 +10,51 @@ from config import (
HUB_MQTT_BROKER_HOST, HUB_MQTT_BROKER_HOST,
HUB_MQTT_BROKER_PORT, HUB_MQTT_BROKER_PORT,
HUB_MQTT_TOPIC, HUB_MQTT_TOPIC,
HUB_CONNECTION_TYPE,
) )
if __name__ == "__main__": if __name__ == "__main__":
# Configure logging settings # Configure logging settings
logging.basicConfig( logging.basicConfig(
level=logging.INFO, # Set the log level to INFO (you can use logging.DEBUG for more detailed logs) level=logging.INFO,
format="[%(asctime)s] [%(levelname)s] [%(module)s] %(message)s", format="[%(asctime)s] [%(levelname)s] [%(module)s] %(message)s",
handlers=[ handlers=[
logging.StreamHandler(), # Output log messages to the console logging.StreamHandler(),
logging.FileHandler("app.log"), # Save log messages to a file logging.FileHandler("app.log"),
], ],
) )
# Create an instance of the StoreApiAdapter using the configuration
# hub_adapter = HubHttpAdapter( # Logic to select the adapter based on configuration (SCRUM-93 & SCRUM-94)
# api_base_url=HUB_URL, # This allows easy switching between HTTP and MQTT protocols
# ) if HUB_CONNECTION_TYPE.lower() == "http":
hub_adapter = HubMqttAdapter( logging.info("Initializing HubHttpAdapter (SCRUM-93 integration)")
broker=HUB_MQTT_BROKER_HOST, hub_adapter = HubHttpAdapter(
port=HUB_MQTT_BROKER_PORT, api_base_url=HUB_URL,
topic=HUB_MQTT_TOPIC, )
) else:
# Create an instance of the AgentMQTTAdapter using the configuration logging.info("Initializing HubMqttAdapter (SCRUM-94 integration)")
hub_adapter = HubMqttAdapter(
broker=HUB_MQTT_BROKER_HOST,
port=HUB_MQTT_BROKER_PORT,
topic=HUB_MQTT_TOPIC,
)
# Create an instance of the AgentMQTTAdapter using the selected hub adapter
# This adapter acts as a bridge between the Agent and the Hub
agent_adapter = AgentMQTTAdapter( agent_adapter = AgentMQTTAdapter(
broker_host=MQTT_BROKER_HOST, broker_host=MQTT_BROKER_HOST,
broker_port=MQTT_BROKER_PORT, broker_port=MQTT_BROKER_PORT,
topic=MQTT_TOPIC, topic=MQTT_TOPIC,
hub_gateway=hub_adapter, hub_gateway=hub_adapter,
) )
try: try:
# Connect to the MQTT broker and start listening for messages logging.info(f"Connecting to MQTT broker at {MQTT_BROKER_HOST}:{MQTT_BROKER_PORT}")
agent_adapter.connect() agent_adapter.connect()
agent_adapter.start()
# Keep the system running indefinitely (you can add other logic as needed) logging.info("Broker connection success. Waiting for data...")
while True: agent_adapter.loop_forever()
pass
except KeyboardInterrupt: except KeyboardInterrupt:
# Stop the MQTT adapter and exit gracefully if interrupted by the user logging.info("Interrupt signal received. Shutting down...")
agent_adapter.stop() agent_adapter.disconnect()
logging.info("System stopped.") logging.info("Disconnected from MQTT broker.")

View File

@@ -33,7 +33,7 @@ processed_agent_data = Table(
) )
# WebSocket subscriptions # WebSocket subscriptions
subscriptions: Dict[int, Set[WebSocket]] = {} subscriptions: Set[WebSocket] = set()
# FastAPI WebSocket endpoint # FastAPI WebSocket endpoint
@@ -41,10 +41,7 @@ subscriptions: Dict[int, Set[WebSocket]] = {}
async def websocket_endpoint(websocket: WebSocket, user_id: int): async def websocket_endpoint(websocket: WebSocket, user_id: int):
await websocket.accept() await websocket.accept()
if user_id not in subscriptions: subscriptions.add(websocket)
subscriptions[user_id] = set()
subscriptions[user_id].add(websocket)
try: try:
# send already available data # send already available data
@@ -55,20 +52,20 @@ async def websocket_endpoint(websocket: WebSocket, user_id: int):
for i in jsonable_data: for i in jsonable_data:
i['timestamp'] = i['timestamp'].strftime("%Y-%m-%dT%H:%M:%SZ") i['timestamp'] = i['timestamp'].strftime("%Y-%m-%dT%H:%M:%SZ")
await websocket.send_json(json.dumps(jsonable_data)) for i in jsonable_data:
await websocket.send_json(json.dumps([i]))
# receive forever # receive forever
while True: while True:
await websocket.receive_text() await websocket.receive_text()
except WebSocketDisconnect: except WebSocketDisconnect:
subscriptions[user_id].remove(websocket) subscriptions.remove(websocket)
# Function to send data to subscribed users # Function to send data to subscribed users
async def send_data_to_subscribers(user_id: int, data): async def send_data_to_subscribers(data):
if user_id in subscriptions: for websocket in subscriptions:
for websocket in subscriptions[user_id]: await websocket.send_json(json.dumps([data]))
await websocket.send_json(json.dumps(data))
# FastAPI CRUDL endpoints # FastAPI CRUDL endpoints
@@ -100,7 +97,7 @@ async def create_processed_agent_data(data: List[ProcessedAgentData], user_id: i
session.commit() session.commit()
for record in created_records: for record in created_records:
await send_data_to_subscribers(user_id, jsonable_encoder(record)) await send_data_to_subscribers(jsonable_encoder(record))
return created_records return created_records
except Exception as err: except Exception as err:
session.rollback() session.rollback()