Merge from upstream 'github/dev' into dev
All checks were successful
Component testing / Hub testing (push) Successful in 21s
Component testing / Store testing (push) Successful in 18s
Component testing / Integration smoke testing (push) Successful in 2m14s

This commit is contained in:
2026-03-24 15:21:56 +02:00
9 changed files with 60 additions and 67 deletions

View File

@@ -8,7 +8,7 @@ def try_parse(type, value: str):
return None
USER_ID = 1
USER_ID = try_parse(int, os.environ.get("USER_ID")) or 1
# MQTT config
MQTT_BROKER_HOST = os.environ.get("MQTT_BROKER_HOST") or "mqtt"
MQTT_BROKER_PORT = try_parse(int, os.environ.get("MQTT_BROKER_PORT")) or 1883

View File

@@ -1,6 +1,7 @@
from paho.mqtt import client as mqtt_client
from schema.aggregated_data_schema import AggregatedDataSchema
from file_datasource import FileDatasource
import logging
import config
@@ -28,6 +29,7 @@ def publish(client, topic, datasource):
data = datasource.read()
msg = AggregatedDataSchema().dumps(data)
result = client.publish(topic, msg)
logging.info(f"Published to {topic}: {msg[:50]}...")
status = result[0]
if status != 0:
print(f"Failed to send message to topic {topic}")

View File

@@ -22,6 +22,7 @@ services:
depends_on:
- mqtt
environment:
PYTHONUNBUFFERED: 1
MQTT_BROKER_HOST: "mqtt"
MQTT_BROKER_PORT: 1883
MQTT_TOPIC: "agent_data_topic"
@@ -39,9 +40,10 @@ services:
environment:
MQTT_BROKER_HOST: "mqtt"
MQTT_BROKER_PORT: 1883
MQTT_TOPIC: " "
HUB_HOST: "store"
MQTT_TOPIC: "agent_data_topic"
HUB_HOST: "hub"
HUB_PORT: 8000
HUB_CONNECTION_TYPE: "http"
HUB_MQTT_BROKER_HOST: "mqtt"
HUB_MQTT_BROKER_PORT: 1883
HUB_MQTT_TOPIC: "processed_data_topic"
@@ -89,6 +91,7 @@ services:
- postgres_db
restart: always
environment:
PYTHONUNBUFFERED: 1
POSTGRES_USER: user
POSTGRES_PASSWORD: pass
POSTGRES_DB: test_db
@@ -120,6 +123,7 @@ services:
- redis
- store
environment:
PYTHONUNBUFFERED: 1
STORE_API_HOST: "store"
STORE_API_PORT: 8000
REDIS_HOST: "redis"

View File

@@ -13,9 +13,7 @@ class AgentMQTTAdapter(AgentGateway):
broker_port,
topic,
hub_gateway: HubGateway,
batch_size=10,
):
self.batch_size = batch_size
# MQTT
self.broker_host = broker_host
self.broker_port = broker_port
@@ -35,42 +33,21 @@ class AgentMQTTAdapter(AgentGateway):
"""Processing agent data and sent it to hub gateway"""
try:
payload: str = msg.payload.decode("utf-8")
# Create AgentData instance with the received data
agent_data = AgentData.model_validate_json(payload, strict=True)
# Process the received data (you can call a use case here if needed)
processed_data = process_agent_data(agent_data)
# Store the agent_data in the database (you can send it to the data processing module)
if not self.hub_gateway.save_data(processed_data):
logging.error("Hub is not available")
if self.hub_gateway.save_data(processed_data):
logging.info("Processed data successfully forwarded to the Hub.")
else:
logging.error("Failed to send data: Hub gateway is unavailable.")
except Exception as e:
logging.info(f"Error processing MQTT message: {e}")
logging.error(f"Error processing MQTT message: {e}")
def connect(self):
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
self.client.connect(self.broker_host, self.broker_port, 60)
def start(self):
self.client.loop_start()
def stop(self):
self.client.loop_stop()
# Usage example:
if __name__ == "__main__":
broker_host = "localhost"
broker_port = 1883
topic = "agent_data_topic"
# Assuming you have implemented the StoreGateway and passed it to the adapter
store_gateway = HubGateway()
adapter = AgentMQTTAdapter(broker_host, broker_port, topic, store_gateway)
adapter.connect()
adapter.start()
try:
# Keep the adapter running in the background
while True:
pass
except KeyboardInterrupt:
adapter.stop()
logging.info("Adapter stopped.")
def loop_forever(self):
self.client.loop_forever()

View File

@@ -14,6 +14,7 @@ class GpsData(BaseModel):
class AgentData(BaseModel):
user_id: int
accelerometer: AccelerometerData
gps: GpsData
timestamp: datetime

View File

@@ -26,15 +26,8 @@ class AgentGateway(ABC):
pass
@abstractmethod
def start(self):
def loop_forever(self):
"""
Method to start listening for messages from the agent.
"""
pass
@abstractmethod
def stop(self):
"""
Method to stop the agent gateway and clean up resources.
Method to await for new messages.
"""
pass

View File

@@ -13,3 +13,7 @@ def process_agent_data(
processed_data_batch (ProcessedAgentData): Processed data containing the classified state of the road surface and agent data.
"""
# Implement it
return ProcessedAgentData(
road_state="normal",
agent_data=agent_data
)

View File

@@ -16,9 +16,12 @@ MQTT_TOPIC = os.environ.get("MQTT_TOPIC") or "agent_data_topic"
# Configuration for hub MQTT
HUB_MQTT_BROKER_HOST = os.environ.get("HUB_MQTT_BROKER_HOST") or "localhost"
HUB_MQTT_BROKER_PORT = try_parse_int(os.environ.get("HUB_MQTT_BROKER_PORT")) or 1883
HUB_MQTT_TOPIC = os.environ.get("HUB_MQTT_TOPIC") or "processed_agent_data_topic"
HUB_MQTT_TOPIC = os.environ.get("HUB_MQTT_TOPIC") or "processed_data_topic"
# Configuration for the Hub
HUB_HOST = os.environ.get("HUB_HOST") or "localhost"
HUB_PORT = try_parse_int(os.environ.get("HUB_PORT")) or 12000
HUB_PORT = try_parse_int(os.environ.get("HUB_PORT")) or 8000
HUB_URL = f"http://{HUB_HOST}:{HUB_PORT}"
# For choosing type of connection
HUB_CONNECTION_TYPE = os.environ.get("HUB_CONNECTION_TYPE") or "mqtt"

View File

@@ -10,42 +10,51 @@ from config import (
HUB_MQTT_BROKER_HOST,
HUB_MQTT_BROKER_PORT,
HUB_MQTT_TOPIC,
HUB_CONNECTION_TYPE,
)
if __name__ == "__main__":
# Configure logging settings
logging.basicConfig(
level=logging.INFO, # Set the log level to INFO (you can use logging.DEBUG for more detailed logs)
level=logging.INFO,
format="[%(asctime)s] [%(levelname)s] [%(module)s] %(message)s",
handlers=[
logging.StreamHandler(), # Output log messages to the console
logging.FileHandler("app.log"), # Save log messages to a file
logging.StreamHandler(),
logging.FileHandler("app.log"),
],
)
# Create an instance of the StoreApiAdapter using the configuration
# hub_adapter = HubHttpAdapter(
# api_base_url=HUB_URL,
# )
hub_adapter = HubMqttAdapter(
broker=HUB_MQTT_BROKER_HOST,
port=HUB_MQTT_BROKER_PORT,
topic=HUB_MQTT_TOPIC,
)
# Create an instance of the AgentMQTTAdapter using the configuration
# Logic to select the adapter based on configuration (SCRUM-93 & SCRUM-94)
# This allows easy switching between HTTP and MQTT protocols
if HUB_CONNECTION_TYPE.lower() == "http":
logging.info("Initializing HubHttpAdapter (SCRUM-93 integration)")
hub_adapter = HubHttpAdapter(
api_base_url=HUB_URL,
)
else:
logging.info("Initializing HubMqttAdapter (SCRUM-94 integration)")
hub_adapter = HubMqttAdapter(
broker=HUB_MQTT_BROKER_HOST,
port=HUB_MQTT_BROKER_PORT,
topic=HUB_MQTT_TOPIC,
)
# Create an instance of the AgentMQTTAdapter using the selected hub adapter
# This adapter acts as a bridge between the Agent and the Hub
agent_adapter = AgentMQTTAdapter(
broker_host=MQTT_BROKER_HOST,
broker_port=MQTT_BROKER_PORT,
topic=MQTT_TOPIC,
hub_gateway=hub_adapter,
)
try:
# Connect to the MQTT broker and start listening for messages
logging.info(f"Connecting to MQTT broker at {MQTT_BROKER_HOST}:{MQTT_BROKER_PORT}")
agent_adapter.connect()
agent_adapter.start()
# Keep the system running indefinitely (you can add other logic as needed)
while True:
pass
logging.info("Broker connection success. Waiting for data...")
agent_adapter.loop_forever()
except KeyboardInterrupt:
# Stop the MQTT adapter and exit gracefully if interrupted by the user
agent_adapter.stop()
logging.info("System stopped.")
logging.info("Interrupt signal received. Shutting down...")
agent_adapter.disconnect()
logging.info("Disconnected from MQTT broker.")