From c085a49c8cc720376f6cd0378ff33d9cf5063aea Mon Sep 17 00:00:00 2001 From: esk4nz Date: Mon, 23 Mar 2026 21:31:31 +0200 Subject: [PATCH] implement Edge-Hub integration with user_id validation (SCRUM-93, SCRUM-94) - Agent: Updated config and main - Edge: Implemented adapter factory in main.py to switch between MQTT and HTTP. - Edge: Updated AgentData entity and processing logic to support user_id. - Infrastructure: Configured docker-compose for dynamic protocol switching and environment management. --- agent/src/config.py | 2 +- agent/src/main.py | 1 + docker-compose.yaml | 11 ++++-- edge/app/adapters/agent_mqtt_adapter.py | 7 ++-- edge/app/entities/agent_data.py | 1 + edge/app/usecases/data_processing.py | 8 +++++ edge/config.py | 7 ++-- edge/main.py | 45 ++++++++++++++++--------- 8 files changed, 59 insertions(+), 23 deletions(-) diff --git a/agent/src/config.py b/agent/src/config.py index f507c71..c5ae3f3 100644 --- a/agent/src/config.py +++ b/agent/src/config.py @@ -8,7 +8,7 @@ def try_parse(type, value: str): return None -USER_ID = 1 +USER_ID = try_parse(int, os.environ.get("USER_ID")) or 1 # MQTT config MQTT_BROKER_HOST = os.environ.get("MQTT_BROKER_HOST") or "mqtt" MQTT_BROKER_PORT = try_parse(int, os.environ.get("MQTT_BROKER_PORT")) or 1883 diff --git a/agent/src/main.py b/agent/src/main.py index ce17d6b..65380af 100644 --- a/agent/src/main.py +++ b/agent/src/main.py @@ -28,6 +28,7 @@ def publish(client, topic, datasource): data = datasource.read() msg = AggregatedDataSchema().dumps(data) result = client.publish(topic, msg) + print(f"Published to {topic}: {msg[:50]}...") status = result[0] if status != 0: print(f"Failed to send message to topic {topic}") diff --git a/docker-compose.yaml b/docker-compose.yaml index 54cdbbe..af411cd 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -22,6 +22,7 @@ services: depends_on: - mqtt environment: + PYTHONUNBUFFERED: 1 MQTT_BROKER_HOST: "mqtt" MQTT_BROKER_PORT: 1883 MQTT_TOPIC: "agent_data_topic" @@ -37,11 +38,13 @@ services: depends_on: - mqtt environment: + PYTHONUNBUFFERED: 1 MQTT_BROKER_HOST: "mqtt" MQTT_BROKER_PORT: 1883 - MQTT_TOPIC: " " - HUB_HOST: "store" + MQTT_TOPIC: "agent_data_topic" + HUB_HOST: "hub" HUB_PORT: 8000 + HUB_CONNECTION_TYPE: "http" HUB_MQTT_BROKER_HOST: "mqtt" HUB_MQTT_BROKER_PORT: 1883 HUB_MQTT_TOPIC: "processed_data_topic" @@ -54,6 +57,7 @@ services: container_name: postgres_db restart: always environment: + PYTHONUNBUFFERED: 1 POSTGRES_USER: user POSTGRES_PASSWORD: pass POSTGRES_DB: test_db @@ -70,6 +74,7 @@ services: image: dpage/pgadmin4 restart: always environment: + PYTHONUNBUFFERED: 1 PGADMIN_DEFAULT_EMAIL: admin@admin.com PGADMIN_DEFAULT_PASSWORD: root volumes: @@ -89,6 +94,7 @@ services: - postgres_db restart: always environment: + PYTHONUNBUFFERED: 1 POSTGRES_USER: user POSTGRES_PASSWORD: pass POSTGRES_DB: test_db @@ -120,6 +126,7 @@ services: - redis - store environment: + PYTHONUNBUFFERED: 1 STORE_API_HOST: "store" STORE_API_PORT: 8000 REDIS_HOST: "redis" diff --git a/edge/app/adapters/agent_mqtt_adapter.py b/edge/app/adapters/agent_mqtt_adapter.py index 855370f..2f4597d 100644 --- a/edge/app/adapters/agent_mqtt_adapter.py +++ b/edge/app/adapters/agent_mqtt_adapter.py @@ -40,8 +40,11 @@ class AgentMQTTAdapter(AgentGateway): # Process the received data (you can call a use case here if needed) processed_data = process_agent_data(agent_data) # Store the agent_data in the database (you can send it to the data processing module) - if not self.hub_gateway.save_data(processed_data): - logging.error("Hub is not available") + if processed_data is not None: + if not self.hub_gateway.save_data(processed_data): + logging.error("Hub is not available") + else: + logging.info("Road is fine, no data sent to hub.") except Exception as e: logging.info(f"Error processing MQTT message: {e}") diff --git a/edge/app/entities/agent_data.py b/edge/app/entities/agent_data.py index 3f92a8b..fdb2b5e 100644 --- a/edge/app/entities/agent_data.py +++ b/edge/app/entities/agent_data.py @@ -14,6 +14,7 @@ class GpsData(BaseModel): class AgentData(BaseModel): + user_id: int accelerometer: AccelerometerData gps: GpsData timestamp: datetime diff --git a/edge/app/usecases/data_processing.py b/edge/app/usecases/data_processing.py index 712643c..bc3d9b1 100644 --- a/edge/app/usecases/data_processing.py +++ b/edge/app/usecases/data_processing.py @@ -13,3 +13,11 @@ def process_agent_data( processed_data_batch (ProcessedAgentData): Processed data containing the classified state of the road surface and agent data. """ # Implement it + + if not hasattr(agent_data, 'user_id') or agent_data.user_id is None: + agent_data.user_id = 1 + + return ProcessedAgentData( + road_state="normal", + agent_data=agent_data + ) diff --git a/edge/config.py b/edge/config.py index b92d0c3..ee6efa3 100644 --- a/edge/config.py +++ b/edge/config.py @@ -16,9 +16,12 @@ MQTT_TOPIC = os.environ.get("MQTT_TOPIC") or "agent_data_topic" # Configuration for hub MQTT HUB_MQTT_BROKER_HOST = os.environ.get("HUB_MQTT_BROKER_HOST") or "localhost" HUB_MQTT_BROKER_PORT = try_parse_int(os.environ.get("HUB_MQTT_BROKER_PORT")) or 1883 -HUB_MQTT_TOPIC = os.environ.get("HUB_MQTT_TOPIC") or "processed_agent_data_topic" +HUB_MQTT_TOPIC = os.environ.get("HUB_MQTT_TOPIC") or "processed_data_topic" # Configuration for the Hub HUB_HOST = os.environ.get("HUB_HOST") or "localhost" -HUB_PORT = try_parse_int(os.environ.get("HUB_PORT")) or 12000 +HUB_PORT = try_parse_int(os.environ.get("HUB_PORT")) or 8000 HUB_URL = f"http://{HUB_HOST}:{HUB_PORT}" + +# For choosing type of connection +HUB_CONNECTION_TYPE = os.environ.get("HUB_CONNECTION_TYPE") or "mqtt" \ No newline at end of file diff --git a/edge/main.py b/edge/main.py index b2166a2..4c38175 100644 --- a/edge/main.py +++ b/edge/main.py @@ -1,4 +1,5 @@ import logging +import os from app.adapters.agent_mqtt_adapter import AgentMQTTAdapter from app.adapters.hub_http_adapter import HubHttpAdapter from app.adapters.hub_mqtt_adapter import HubMqttAdapter @@ -10,42 +11,54 @@ from config import ( HUB_MQTT_BROKER_HOST, HUB_MQTT_BROKER_PORT, HUB_MQTT_TOPIC, + HUB_CONNECTION_TYPE, ) if __name__ == "__main__": # Configure logging settings logging.basicConfig( - level=logging.INFO, # Set the log level to INFO (you can use logging.DEBUG for more detailed logs) + level=logging.INFO, format="[%(asctime)s] [%(levelname)s] [%(module)s] %(message)s", handlers=[ - logging.StreamHandler(), # Output log messages to the console - logging.FileHandler("app.log"), # Save log messages to a file + logging.StreamHandler(), + logging.FileHandler("app.log"), ], ) - # Create an instance of the StoreApiAdapter using the configuration - # hub_adapter = HubHttpAdapter( - # api_base_url=HUB_URL, - # ) - hub_adapter = HubMqttAdapter( - broker=HUB_MQTT_BROKER_HOST, - port=HUB_MQTT_BROKER_PORT, - topic=HUB_MQTT_TOPIC, - ) - # Create an instance of the AgentMQTTAdapter using the configuration + + # Logic to select the adapter based on configuration (SCRUM-93 & SCRUM-94) + # This allows easy switching between HTTP and MQTT protocols + if HUB_CONNECTION_TYPE.lower() == "http": + logging.info("Initializing HubHttpAdapter (SCRUM-93 integration)") + hub_adapter = HubHttpAdapter( + api_base_url=HUB_URL, + ) + else: + logging.info("Initializing HubMqttAdapter (SCRUM-94 integration)") + hub_adapter = HubMqttAdapter( + broker=HUB_MQTT_BROKER_HOST, + port=HUB_MQTT_BROKER_PORT, + topic=HUB_MQTT_TOPIC, + ) + + # Create an instance of the AgentMQTTAdapter using the selected hub adapter + # This adapter acts as a bridge between the Agent and the Hub agent_adapter = AgentMQTTAdapter( broker_host=MQTT_BROKER_HOST, broker_port=MQTT_BROKER_PORT, topic=MQTT_TOPIC, hub_gateway=hub_adapter, ) + try: - # Connect to the MQTT broker and start listening for messages + logging.info(f"Starting Edge module. Connecting to Agent Broker at {MQTT_BROKER_HOST}:{MQTT_BROKER_PORT}") + # Connect to the MQTT broker and start listening for messages from Agent agent_adapter.connect() agent_adapter.start() - # Keep the system running indefinitely (you can add other logic as needed) + + # Keep the system running indefinitely to process incoming data streams while True: pass except KeyboardInterrupt: # Stop the MQTT adapter and exit gracefully if interrupted by the user agent_adapter.stop() - logging.info("System stopped.") + logging.info("System stopped.") \ No newline at end of file