Compare commits

..

4 Commits

Author SHA1 Message Date
hasslesstech 72f37edcd4 [P] Move diagrams into a dedicated directory
Component testing / Hub testing (push) Successful in 29s
Component testing / Store testing (push) Successful in 28s
Component testing / Integration smoke testing (push) Successful in 2m51s
2026-03-23 22:44:40 +02:00
hasslesstech d709def150 [P] Convert use case diagram files to PlantUML format 2026-03-23 22:44:40 +02:00
hasslesstech c3dffb0d76 [P] Convert sequence diagram files to PlantUML format 2026-03-23 22:44:40 +02:00
bacant150 153e0592e6 Add use-case and sequence diagrams 2026-03-23 22:44:40 +02:00
13 changed files with 133 additions and 70 deletions
+1 -1
View File
@@ -32,7 +32,7 @@ class MapViewApp(App):
Встановлює необхідні маркери, викликає функцію для оновлення мапи Встановлює необхідні маркери, викликає функцію для оновлення мапи
""" """
self.update() self.update()
Clock.schedule_interval(self.update, 0.1) Clock.schedule_interval(self.update, 5)
def update(self, *args): def update(self, *args):
""" """
+1 -1
View File
@@ -8,7 +8,7 @@ def try_parse(type, value: str):
return None return None
USER_ID = try_parse(int, os.environ.get("USER_ID")) or 1 USER_ID = 1
# MQTT config # MQTT config
MQTT_BROKER_HOST = os.environ.get("MQTT_BROKER_HOST") or "mqtt" MQTT_BROKER_HOST = os.environ.get("MQTT_BROKER_HOST") or "mqtt"
MQTT_BROKER_PORT = try_parse(int, os.environ.get("MQTT_BROKER_PORT")) or 1883 MQTT_BROKER_PORT = try_parse(int, os.environ.get("MQTT_BROKER_PORT")) or 1883
-2
View File
@@ -1,7 +1,6 @@
from paho.mqtt import client as mqtt_client from paho.mqtt import client as mqtt_client
from schema.aggregated_data_schema import AggregatedDataSchema from schema.aggregated_data_schema import AggregatedDataSchema
from file_datasource import FileDatasource from file_datasource import FileDatasource
import logging
import config import config
@@ -29,7 +28,6 @@ def publish(client, topic, datasource):
data = datasource.read() data = datasource.read()
msg = AggregatedDataSchema().dumps(data) msg = AggregatedDataSchema().dumps(data)
result = client.publish(topic, msg) result = client.publish(topic, msg)
logging.info(f"Published to {topic}: {msg[:50]}...")
status = result[0] status = result[0]
if status != 0: if status != 0:
print(f"Failed to send message to topic {topic}") print(f"Failed to send message to topic {topic}")
+30
View File
@@ -0,0 +1,30 @@
@startuml
participant Agent as agent
participant "MQTT Broker (raw\ntopic)" as mqtt1
participant "Edge Service" as edge
participant "MQTT\nBroker (processed\ntopic)" as mqtt2
participant "Hub Service" as hub
participant "Redis" as redis
participant "Store API" as store
participant "PostgreSQL" as db
participant "MapView Client" as mapview
agent -> mqtt1 : Publish raw\ntelemetry
mqtt1 -> edge : Deliver raw\nmessage
edge -> edge : Validate\nAgentData
edge -> edge : Process\ntelemetry
edge -> mqtt2 : Publish\nprocessed data
mqtt2 -> hub : Deliver processed\nmessage
hub -> hub : Validate\nProcessedAgentData
hub -> redis : LPUSH to buffer
hub -> redis : LPOP batch item
redis -> hub : Return item
hub -> store : POST batch
store -> db : INSERT records
db --> store : Return created\nrecords
store -> mapview : WebSocket push
mapview -> mapview : Sort by timestamp
mapview -> mapview : Update vehicle\nmarker
mapview -> mapview : Add pothole/bump\nmarker
store --> hub : Success response
@enduml
+23
View File
@@ -0,0 +1,23 @@
@startuml
rectangle IoT-Systems {
usecase "Collect telemetry (accelerometer + GPS)" as uc1
usecase "Send telemetry" as uc2
usecase "Process telemetry" as uc3
usecase "Determine road condition (pothole / bump /\nnormal)" as uc4
usecase "View road defect marks" as uc5
usecase "View route on map" as uc6
}
rectangle "The user is the card operator" as uc10
rectangle "Sensor Agent\n(Device/STM32/Emulator)" as uc11
uc11 - uc1
uc11 - uc2
uc10 - uc5
uc10 - uc6
uc2 -.|> uc3 : <<include>>
uc3 -.|> uc4 : <<include>>
@enduml
+2 -6
View File
@@ -22,7 +22,6 @@ services:
depends_on: depends_on:
- mqtt - mqtt
environment: environment:
PYTHONUNBUFFERED: 1
MQTT_BROKER_HOST: "mqtt" MQTT_BROKER_HOST: "mqtt"
MQTT_BROKER_PORT: 1883 MQTT_BROKER_PORT: 1883
MQTT_TOPIC: "agent_data_topic" MQTT_TOPIC: "agent_data_topic"
@@ -40,10 +39,9 @@ services:
environment: environment:
MQTT_BROKER_HOST: "mqtt" MQTT_BROKER_HOST: "mqtt"
MQTT_BROKER_PORT: 1883 MQTT_BROKER_PORT: 1883
MQTT_TOPIC: "agent_data_topic" MQTT_TOPIC: " "
HUB_HOST: "hub" HUB_HOST: "store"
HUB_PORT: 8000 HUB_PORT: 8000
HUB_CONNECTION_TYPE: "http"
HUB_MQTT_BROKER_HOST: "mqtt" HUB_MQTT_BROKER_HOST: "mqtt"
HUB_MQTT_BROKER_PORT: 1883 HUB_MQTT_BROKER_PORT: 1883
HUB_MQTT_TOPIC: "processed_data_topic" HUB_MQTT_TOPIC: "processed_data_topic"
@@ -91,7 +89,6 @@ services:
- postgres_db - postgres_db
restart: always restart: always
environment: environment:
PYTHONUNBUFFERED: 1
POSTGRES_USER: user POSTGRES_USER: user
POSTGRES_PASSWORD: pass POSTGRES_PASSWORD: pass
POSTGRES_DB: test_db POSTGRES_DB: test_db
@@ -123,7 +120,6 @@ services:
- redis - redis
- store - store
environment: environment:
PYTHONUNBUFFERED: 1
STORE_API_HOST: "store" STORE_API_HOST: "store"
STORE_API_PORT: 8000 STORE_API_PORT: 8000
REDIS_HOST: "redis" REDIS_HOST: "redis"
+32 -9
View File
@@ -13,7 +13,9 @@ class AgentMQTTAdapter(AgentGateway):
broker_port, broker_port,
topic, topic,
hub_gateway: HubGateway, hub_gateway: HubGateway,
batch_size=10,
): ):
self.batch_size = batch_size
# MQTT # MQTT
self.broker_host = broker_host self.broker_host = broker_host
self.broker_port = broker_port self.broker_port = broker_port
@@ -33,21 +35,42 @@ class AgentMQTTAdapter(AgentGateway):
"""Processing agent data and sent it to hub gateway""" """Processing agent data and sent it to hub gateway"""
try: try:
payload: str = msg.payload.decode("utf-8") payload: str = msg.payload.decode("utf-8")
# Create AgentData instance with the received data
agent_data = AgentData.model_validate_json(payload, strict=True) agent_data = AgentData.model_validate_json(payload, strict=True)
# Process the received data (you can call a use case here if needed)
processed_data = process_agent_data(agent_data) processed_data = process_agent_data(agent_data)
# Store the agent_data in the database (you can send it to the data processing module)
if self.hub_gateway.save_data(processed_data): if not self.hub_gateway.save_data(processed_data):
logging.info("Processed data successfully forwarded to the Hub.") logging.error("Hub is not available")
else:
logging.error("Failed to send data: Hub gateway is unavailable.")
except Exception as e: except Exception as e:
logging.error(f"Error processing MQTT message: {e}") logging.info(f"Error processing MQTT message: {e}")
def connect(self): def connect(self):
self.client.on_connect = self.on_connect self.client.on_connect = self.on_connect
self.client.on_message = self.on_message self.client.on_message = self.on_message
self.client.connect(self.broker_host, self.broker_port, 60) self.client.connect(self.broker_host, self.broker_port, 60)
def loop_forever(self): def start(self):
self.client.loop_forever() self.client.loop_start()
def stop(self):
self.client.loop_stop()
# Usage example:
if __name__ == "__main__":
broker_host = "localhost"
broker_port = 1883
topic = "agent_data_topic"
# Assuming you have implemented the StoreGateway and passed it to the adapter
store_gateway = HubGateway()
adapter = AgentMQTTAdapter(broker_host, broker_port, topic, store_gateway)
adapter.connect()
adapter.start()
try:
# Keep the adapter running in the background
while True:
pass
except KeyboardInterrupt:
adapter.stop()
logging.info("Adapter stopped.")
-1
View File
@@ -14,7 +14,6 @@ class GpsData(BaseModel):
class AgentData(BaseModel): class AgentData(BaseModel):
user_id: int
accelerometer: AccelerometerData accelerometer: AccelerometerData
gps: GpsData gps: GpsData
timestamp: datetime timestamp: datetime
+9 -2
View File
@@ -26,8 +26,15 @@ class AgentGateway(ABC):
pass pass
@abstractmethod @abstractmethod
def loop_forever(self): def start(self):
""" """
Method to await for new messages. Method to start listening for messages from the agent.
"""
pass
@abstractmethod
def stop(self):
"""
Method to stop the agent gateway and clean up resources.
""" """
pass pass
-4
View File
@@ -13,7 +13,3 @@ def process_agent_data(
processed_data_batch (ProcessedAgentData): Processed data containing the classified state of the road surface and agent data. processed_data_batch (ProcessedAgentData): Processed data containing the classified state of the road surface and agent data.
""" """
# Implement it # Implement it
return ProcessedAgentData(
road_state="normal",
agent_data=agent_data
)
+2 -5
View File
@@ -16,12 +16,9 @@ MQTT_TOPIC = os.environ.get("MQTT_TOPIC") or "agent_data_topic"
# Configuration for hub MQTT # Configuration for hub MQTT
HUB_MQTT_BROKER_HOST = os.environ.get("HUB_MQTT_BROKER_HOST") or "localhost" HUB_MQTT_BROKER_HOST = os.environ.get("HUB_MQTT_BROKER_HOST") or "localhost"
HUB_MQTT_BROKER_PORT = try_parse_int(os.environ.get("HUB_MQTT_BROKER_PORT")) or 1883 HUB_MQTT_BROKER_PORT = try_parse_int(os.environ.get("HUB_MQTT_BROKER_PORT")) or 1883
HUB_MQTT_TOPIC = os.environ.get("HUB_MQTT_TOPIC") or "processed_data_topic" HUB_MQTT_TOPIC = os.environ.get("HUB_MQTT_TOPIC") or "processed_agent_data_topic"
# Configuration for the Hub # Configuration for the Hub
HUB_HOST = os.environ.get("HUB_HOST") or "localhost" HUB_HOST = os.environ.get("HUB_HOST") or "localhost"
HUB_PORT = try_parse_int(os.environ.get("HUB_PORT")) or 8000 HUB_PORT = try_parse_int(os.environ.get("HUB_PORT")) or 12000
HUB_URL = f"http://{HUB_HOST}:{HUB_PORT}" HUB_URL = f"http://{HUB_HOST}:{HUB_PORT}"
# For choosing type of connection
HUB_CONNECTION_TYPE = os.environ.get("HUB_CONNECTION_TYPE") or "mqtt"
+16 -25
View File
@@ -10,51 +10,42 @@ from config import (
HUB_MQTT_BROKER_HOST, HUB_MQTT_BROKER_HOST,
HUB_MQTT_BROKER_PORT, HUB_MQTT_BROKER_PORT,
HUB_MQTT_TOPIC, HUB_MQTT_TOPIC,
HUB_CONNECTION_TYPE,
) )
if __name__ == "__main__": if __name__ == "__main__":
# Configure logging settings # Configure logging settings
logging.basicConfig( logging.basicConfig(
level=logging.INFO, level=logging.INFO, # Set the log level to INFO (you can use logging.DEBUG for more detailed logs)
format="[%(asctime)s] [%(levelname)s] [%(module)s] %(message)s", format="[%(asctime)s] [%(levelname)s] [%(module)s] %(message)s",
handlers=[ handlers=[
logging.StreamHandler(), logging.StreamHandler(), # Output log messages to the console
logging.FileHandler("app.log"), logging.FileHandler("app.log"), # Save log messages to a file
], ],
) )
# Create an instance of the StoreApiAdapter using the configuration
# Logic to select the adapter based on configuration (SCRUM-93 & SCRUM-94) # hub_adapter = HubHttpAdapter(
# This allows easy switching between HTTP and MQTT protocols # api_base_url=HUB_URL,
if HUB_CONNECTION_TYPE.lower() == "http": # )
logging.info("Initializing HubHttpAdapter (SCRUM-93 integration)")
hub_adapter = HubHttpAdapter(
api_base_url=HUB_URL,
)
else:
logging.info("Initializing HubMqttAdapter (SCRUM-94 integration)")
hub_adapter = HubMqttAdapter( hub_adapter = HubMqttAdapter(
broker=HUB_MQTT_BROKER_HOST, broker=HUB_MQTT_BROKER_HOST,
port=HUB_MQTT_BROKER_PORT, port=HUB_MQTT_BROKER_PORT,
topic=HUB_MQTT_TOPIC, topic=HUB_MQTT_TOPIC,
) )
# Create an instance of the AgentMQTTAdapter using the configuration
# Create an instance of the AgentMQTTAdapter using the selected hub adapter
# This adapter acts as a bridge between the Agent and the Hub
agent_adapter = AgentMQTTAdapter( agent_adapter = AgentMQTTAdapter(
broker_host=MQTT_BROKER_HOST, broker_host=MQTT_BROKER_HOST,
broker_port=MQTT_BROKER_PORT, broker_port=MQTT_BROKER_PORT,
topic=MQTT_TOPIC, topic=MQTT_TOPIC,
hub_gateway=hub_adapter, hub_gateway=hub_adapter,
) )
try: try:
logging.info(f"Connecting to MQTT broker at {MQTT_BROKER_HOST}:{MQTT_BROKER_PORT}") # Connect to the MQTT broker and start listening for messages
agent_adapter.connect() agent_adapter.connect()
agent_adapter.start()
logging.info("Broker connection success. Waiting for data...") # Keep the system running indefinitely (you can add other logic as needed)
agent_adapter.loop_forever() while True:
pass
except KeyboardInterrupt: except KeyboardInterrupt:
logging.info("Interrupt signal received. Shutting down...") # Stop the MQTT adapter and exit gracefully if interrupted by the user
agent_adapter.disconnect() agent_adapter.stop()
logging.info("Disconnected from MQTT broker.") logging.info("System stopped.")
+12 -9
View File
@@ -33,7 +33,7 @@ processed_agent_data = Table(
) )
# WebSocket subscriptions # WebSocket subscriptions
subscriptions: Set[WebSocket] = set() subscriptions: Dict[int, Set[WebSocket]] = {}
# FastAPI WebSocket endpoint # FastAPI WebSocket endpoint
@@ -41,7 +41,10 @@ subscriptions: Set[WebSocket] = set()
async def websocket_endpoint(websocket: WebSocket, user_id: int): async def websocket_endpoint(websocket: WebSocket, user_id: int):
await websocket.accept() await websocket.accept()
subscriptions.add(websocket) if user_id not in subscriptions:
subscriptions[user_id] = set()
subscriptions[user_id].add(websocket)
try: try:
# send already available data # send already available data
@@ -52,20 +55,20 @@ async def websocket_endpoint(websocket: WebSocket, user_id: int):
for i in jsonable_data: for i in jsonable_data:
i['timestamp'] = i['timestamp'].strftime("%Y-%m-%dT%H:%M:%SZ") i['timestamp'] = i['timestamp'].strftime("%Y-%m-%dT%H:%M:%SZ")
for i in jsonable_data: await websocket.send_json(json.dumps(jsonable_data))
await websocket.send_json(json.dumps([i]))
# receive forever # receive forever
while True: while True:
await websocket.receive_text() await websocket.receive_text()
except WebSocketDisconnect: except WebSocketDisconnect:
subscriptions.remove(websocket) subscriptions[user_id].remove(websocket)
# Function to send data to subscribed users # Function to send data to subscribed users
async def send_data_to_subscribers(data): async def send_data_to_subscribers(user_id: int, data):
for websocket in subscriptions: if user_id in subscriptions:
await websocket.send_json(json.dumps([data])) for websocket in subscriptions[user_id]:
await websocket.send_json(json.dumps(data))
# FastAPI CRUDL endpoints # FastAPI CRUDL endpoints
@@ -97,7 +100,7 @@ async def create_processed_agent_data(data: List[ProcessedAgentData], user_id: i
session.commit() session.commit()
for record in created_records: for record in created_records:
await send_data_to_subscribers(jsonable_encoder(record)) await send_data_to_subscribers(user_id, jsonable_encoder(record))
return created_records return created_records
except Exception as err: except Exception as err:
session.rollback() session.rollback()