implement Edge-Hub integration with user_id validation (SCRUM-93, SCRUM-94)
- Agent: Updated config and main - Edge: Implemented adapter factory in main.py to switch between MQTT and HTTP. - Edge: Updated AgentData entity and processing logic to support user_id. - Infrastructure: Configured docker-compose for dynamic protocol switching and environment management.
This commit is contained in:
@@ -8,7 +8,7 @@ def try_parse(type, value: str):
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
USER_ID = 1
|
USER_ID = try_parse(int, os.environ.get("USER_ID")) or 1
|
||||||
# MQTT config
|
# MQTT config
|
||||||
MQTT_BROKER_HOST = os.environ.get("MQTT_BROKER_HOST") or "mqtt"
|
MQTT_BROKER_HOST = os.environ.get("MQTT_BROKER_HOST") or "mqtt"
|
||||||
MQTT_BROKER_PORT = try_parse(int, os.environ.get("MQTT_BROKER_PORT")) or 1883
|
MQTT_BROKER_PORT = try_parse(int, os.environ.get("MQTT_BROKER_PORT")) or 1883
|
||||||
|
|||||||
@@ -28,6 +28,7 @@ def publish(client, topic, datasource):
|
|||||||
data = datasource.read()
|
data = datasource.read()
|
||||||
msg = AggregatedDataSchema().dumps(data)
|
msg = AggregatedDataSchema().dumps(data)
|
||||||
result = client.publish(topic, msg)
|
result = client.publish(topic, msg)
|
||||||
|
print(f"Published to {topic}: {msg[:50]}...")
|
||||||
status = result[0]
|
status = result[0]
|
||||||
if status != 0:
|
if status != 0:
|
||||||
print(f"Failed to send message to topic {topic}")
|
print(f"Failed to send message to topic {topic}")
|
||||||
|
|||||||
@@ -22,6 +22,7 @@ services:
|
|||||||
depends_on:
|
depends_on:
|
||||||
- mqtt
|
- mqtt
|
||||||
environment:
|
environment:
|
||||||
|
PYTHONUNBUFFERED: 1
|
||||||
MQTT_BROKER_HOST: "mqtt"
|
MQTT_BROKER_HOST: "mqtt"
|
||||||
MQTT_BROKER_PORT: 1883
|
MQTT_BROKER_PORT: 1883
|
||||||
MQTT_TOPIC: "agent_data_topic"
|
MQTT_TOPIC: "agent_data_topic"
|
||||||
@@ -37,11 +38,13 @@ services:
|
|||||||
depends_on:
|
depends_on:
|
||||||
- mqtt
|
- mqtt
|
||||||
environment:
|
environment:
|
||||||
|
PYTHONUNBUFFERED: 1
|
||||||
MQTT_BROKER_HOST: "mqtt"
|
MQTT_BROKER_HOST: "mqtt"
|
||||||
MQTT_BROKER_PORT: 1883
|
MQTT_BROKER_PORT: 1883
|
||||||
MQTT_TOPIC: " "
|
MQTT_TOPIC: "agent_data_topic"
|
||||||
HUB_HOST: "store"
|
HUB_HOST: "hub"
|
||||||
HUB_PORT: 8000
|
HUB_PORT: 8000
|
||||||
|
HUB_CONNECTION_TYPE: "http"
|
||||||
HUB_MQTT_BROKER_HOST: "mqtt"
|
HUB_MQTT_BROKER_HOST: "mqtt"
|
||||||
HUB_MQTT_BROKER_PORT: 1883
|
HUB_MQTT_BROKER_PORT: 1883
|
||||||
HUB_MQTT_TOPIC: "processed_data_topic"
|
HUB_MQTT_TOPIC: "processed_data_topic"
|
||||||
@@ -54,6 +57,7 @@ services:
|
|||||||
container_name: postgres_db
|
container_name: postgres_db
|
||||||
restart: always
|
restart: always
|
||||||
environment:
|
environment:
|
||||||
|
PYTHONUNBUFFERED: 1
|
||||||
POSTGRES_USER: user
|
POSTGRES_USER: user
|
||||||
POSTGRES_PASSWORD: pass
|
POSTGRES_PASSWORD: pass
|
||||||
POSTGRES_DB: test_db
|
POSTGRES_DB: test_db
|
||||||
@@ -70,6 +74,7 @@ services:
|
|||||||
image: dpage/pgadmin4
|
image: dpage/pgadmin4
|
||||||
restart: always
|
restart: always
|
||||||
environment:
|
environment:
|
||||||
|
PYTHONUNBUFFERED: 1
|
||||||
PGADMIN_DEFAULT_EMAIL: admin@admin.com
|
PGADMIN_DEFAULT_EMAIL: admin@admin.com
|
||||||
PGADMIN_DEFAULT_PASSWORD: root
|
PGADMIN_DEFAULT_PASSWORD: root
|
||||||
volumes:
|
volumes:
|
||||||
@@ -89,6 +94,7 @@ services:
|
|||||||
- postgres_db
|
- postgres_db
|
||||||
restart: always
|
restart: always
|
||||||
environment:
|
environment:
|
||||||
|
PYTHONUNBUFFERED: 1
|
||||||
POSTGRES_USER: user
|
POSTGRES_USER: user
|
||||||
POSTGRES_PASSWORD: pass
|
POSTGRES_PASSWORD: pass
|
||||||
POSTGRES_DB: test_db
|
POSTGRES_DB: test_db
|
||||||
@@ -120,6 +126,7 @@ services:
|
|||||||
- redis
|
- redis
|
||||||
- store
|
- store
|
||||||
environment:
|
environment:
|
||||||
|
PYTHONUNBUFFERED: 1
|
||||||
STORE_API_HOST: "store"
|
STORE_API_HOST: "store"
|
||||||
STORE_API_PORT: 8000
|
STORE_API_PORT: 8000
|
||||||
REDIS_HOST: "redis"
|
REDIS_HOST: "redis"
|
||||||
|
|||||||
@@ -40,8 +40,11 @@ class AgentMQTTAdapter(AgentGateway):
|
|||||||
# Process the received data (you can call a use case here if needed)
|
# Process the received data (you can call a use case here if needed)
|
||||||
processed_data = process_agent_data(agent_data)
|
processed_data = process_agent_data(agent_data)
|
||||||
# Store the agent_data in the database (you can send it to the data processing module)
|
# Store the agent_data in the database (you can send it to the data processing module)
|
||||||
if not self.hub_gateway.save_data(processed_data):
|
if processed_data is not None:
|
||||||
logging.error("Hub is not available")
|
if not self.hub_gateway.save_data(processed_data):
|
||||||
|
logging.error("Hub is not available")
|
||||||
|
else:
|
||||||
|
logging.info("Road is fine, no data sent to hub.")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.info(f"Error processing MQTT message: {e}")
|
logging.info(f"Error processing MQTT message: {e}")
|
||||||
|
|
||||||
|
|||||||
@@ -14,6 +14,7 @@ class GpsData(BaseModel):
|
|||||||
|
|
||||||
|
|
||||||
class AgentData(BaseModel):
|
class AgentData(BaseModel):
|
||||||
|
user_id: int
|
||||||
accelerometer: AccelerometerData
|
accelerometer: AccelerometerData
|
||||||
gps: GpsData
|
gps: GpsData
|
||||||
timestamp: datetime
|
timestamp: datetime
|
||||||
|
|||||||
@@ -13,3 +13,11 @@ def process_agent_data(
|
|||||||
processed_data_batch (ProcessedAgentData): Processed data containing the classified state of the road surface and agent data.
|
processed_data_batch (ProcessedAgentData): Processed data containing the classified state of the road surface and agent data.
|
||||||
"""
|
"""
|
||||||
# Implement it
|
# Implement it
|
||||||
|
|
||||||
|
if not hasattr(agent_data, 'user_id') or agent_data.user_id is None:
|
||||||
|
agent_data.user_id = 1
|
||||||
|
|
||||||
|
return ProcessedAgentData(
|
||||||
|
road_state="normal",
|
||||||
|
agent_data=agent_data
|
||||||
|
)
|
||||||
|
|||||||
@@ -16,9 +16,12 @@ MQTT_TOPIC = os.environ.get("MQTT_TOPIC") or "agent_data_topic"
|
|||||||
# Configuration for hub MQTT
|
# Configuration for hub MQTT
|
||||||
HUB_MQTT_BROKER_HOST = os.environ.get("HUB_MQTT_BROKER_HOST") or "localhost"
|
HUB_MQTT_BROKER_HOST = os.environ.get("HUB_MQTT_BROKER_HOST") or "localhost"
|
||||||
HUB_MQTT_BROKER_PORT = try_parse_int(os.environ.get("HUB_MQTT_BROKER_PORT")) or 1883
|
HUB_MQTT_BROKER_PORT = try_parse_int(os.environ.get("HUB_MQTT_BROKER_PORT")) or 1883
|
||||||
HUB_MQTT_TOPIC = os.environ.get("HUB_MQTT_TOPIC") or "processed_agent_data_topic"
|
HUB_MQTT_TOPIC = os.environ.get("HUB_MQTT_TOPIC") or "processed_data_topic"
|
||||||
|
|
||||||
# Configuration for the Hub
|
# Configuration for the Hub
|
||||||
HUB_HOST = os.environ.get("HUB_HOST") or "localhost"
|
HUB_HOST = os.environ.get("HUB_HOST") or "localhost"
|
||||||
HUB_PORT = try_parse_int(os.environ.get("HUB_PORT")) or 12000
|
HUB_PORT = try_parse_int(os.environ.get("HUB_PORT")) or 8000
|
||||||
HUB_URL = f"http://{HUB_HOST}:{HUB_PORT}"
|
HUB_URL = f"http://{HUB_HOST}:{HUB_PORT}"
|
||||||
|
|
||||||
|
# For choosing type of connection
|
||||||
|
HUB_CONNECTION_TYPE = os.environ.get("HUB_CONNECTION_TYPE") or "mqtt"
|
||||||
45
edge/main.py
45
edge/main.py
@@ -1,4 +1,5 @@
|
|||||||
import logging
|
import logging
|
||||||
|
import os
|
||||||
from app.adapters.agent_mqtt_adapter import AgentMQTTAdapter
|
from app.adapters.agent_mqtt_adapter import AgentMQTTAdapter
|
||||||
from app.adapters.hub_http_adapter import HubHttpAdapter
|
from app.adapters.hub_http_adapter import HubHttpAdapter
|
||||||
from app.adapters.hub_mqtt_adapter import HubMqttAdapter
|
from app.adapters.hub_mqtt_adapter import HubMqttAdapter
|
||||||
@@ -10,42 +11,54 @@ from config import (
|
|||||||
HUB_MQTT_BROKER_HOST,
|
HUB_MQTT_BROKER_HOST,
|
||||||
HUB_MQTT_BROKER_PORT,
|
HUB_MQTT_BROKER_PORT,
|
||||||
HUB_MQTT_TOPIC,
|
HUB_MQTT_TOPIC,
|
||||||
|
HUB_CONNECTION_TYPE,
|
||||||
)
|
)
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
# Configure logging settings
|
# Configure logging settings
|
||||||
logging.basicConfig(
|
logging.basicConfig(
|
||||||
level=logging.INFO, # Set the log level to INFO (you can use logging.DEBUG for more detailed logs)
|
level=logging.INFO,
|
||||||
format="[%(asctime)s] [%(levelname)s] [%(module)s] %(message)s",
|
format="[%(asctime)s] [%(levelname)s] [%(module)s] %(message)s",
|
||||||
handlers=[
|
handlers=[
|
||||||
logging.StreamHandler(), # Output log messages to the console
|
logging.StreamHandler(),
|
||||||
logging.FileHandler("app.log"), # Save log messages to a file
|
logging.FileHandler("app.log"),
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
# Create an instance of the StoreApiAdapter using the configuration
|
|
||||||
# hub_adapter = HubHttpAdapter(
|
# Logic to select the adapter based on configuration (SCRUM-93 & SCRUM-94)
|
||||||
# api_base_url=HUB_URL,
|
# This allows easy switching between HTTP and MQTT protocols
|
||||||
# )
|
if HUB_CONNECTION_TYPE.lower() == "http":
|
||||||
hub_adapter = HubMqttAdapter(
|
logging.info("Initializing HubHttpAdapter (SCRUM-93 integration)")
|
||||||
broker=HUB_MQTT_BROKER_HOST,
|
hub_adapter = HubHttpAdapter(
|
||||||
port=HUB_MQTT_BROKER_PORT,
|
api_base_url=HUB_URL,
|
||||||
topic=HUB_MQTT_TOPIC,
|
)
|
||||||
)
|
else:
|
||||||
# Create an instance of the AgentMQTTAdapter using the configuration
|
logging.info("Initializing HubMqttAdapter (SCRUM-94 integration)")
|
||||||
|
hub_adapter = HubMqttAdapter(
|
||||||
|
broker=HUB_MQTT_BROKER_HOST,
|
||||||
|
port=HUB_MQTT_BROKER_PORT,
|
||||||
|
topic=HUB_MQTT_TOPIC,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create an instance of the AgentMQTTAdapter using the selected hub adapter
|
||||||
|
# This adapter acts as a bridge between the Agent and the Hub
|
||||||
agent_adapter = AgentMQTTAdapter(
|
agent_adapter = AgentMQTTAdapter(
|
||||||
broker_host=MQTT_BROKER_HOST,
|
broker_host=MQTT_BROKER_HOST,
|
||||||
broker_port=MQTT_BROKER_PORT,
|
broker_port=MQTT_BROKER_PORT,
|
||||||
topic=MQTT_TOPIC,
|
topic=MQTT_TOPIC,
|
||||||
hub_gateway=hub_adapter,
|
hub_gateway=hub_adapter,
|
||||||
)
|
)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Connect to the MQTT broker and start listening for messages
|
logging.info(f"Starting Edge module. Connecting to Agent Broker at {MQTT_BROKER_HOST}:{MQTT_BROKER_PORT}")
|
||||||
|
# Connect to the MQTT broker and start listening for messages from Agent
|
||||||
agent_adapter.connect()
|
agent_adapter.connect()
|
||||||
agent_adapter.start()
|
agent_adapter.start()
|
||||||
# Keep the system running indefinitely (you can add other logic as needed)
|
|
||||||
|
# Keep the system running indefinitely to process incoming data streams
|
||||||
while True:
|
while True:
|
||||||
pass
|
pass
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
# Stop the MQTT adapter and exit gracefully if interrupted by the user
|
# Stop the MQTT adapter and exit gracefully if interrupted by the user
|
||||||
agent_adapter.stop()
|
agent_adapter.stop()
|
||||||
logging.info("System stopped.")
|
logging.info("System stopped.")
|
||||||
Reference in New Issue
Block a user