diff --git a/edge/Dockerfile b/edge/Dockerfile new file mode 100644 index 0000000..f87d862 --- /dev/null +++ b/edge/Dockerfile @@ -0,0 +1,11 @@ +# Use the official Python image as the base image +FROM python:3.9-slim +# Set the working directory inside the container +WORKDIR /app +# Copy the requirements.txt file and install dependencies +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt +# Copy the entire application into the container +COPY . . +# Run the main.py script inside the container when it starts +CMD ["python", "main.py"] \ No newline at end of file diff --git a/edge/app/adapters/agent_mqtt_adapter.py b/edge/app/adapters/agent_mqtt_adapter.py new file mode 100644 index 0000000..855370f --- /dev/null +++ b/edge/app/adapters/agent_mqtt_adapter.py @@ -0,0 +1,76 @@ +import logging +import paho.mqtt.client as mqtt +from app.interfaces.agent_gateway import AgentGateway +from app.entities.agent_data import AgentData, GpsData +from app.usecases.data_processing import process_agent_data +from app.interfaces.hub_gateway import HubGateway + + +class AgentMQTTAdapter(AgentGateway): + def __init__( + self, + broker_host, + broker_port, + topic, + hub_gateway: HubGateway, + batch_size=10, + ): + self.batch_size = batch_size + # MQTT + self.broker_host = broker_host + self.broker_port = broker_port + self.topic = topic + self.client = mqtt.Client() + # Hub + self.hub_gateway = hub_gateway + + def on_connect(self, client, userdata, flags, rc): + if rc == 0: + logging.info("Connected to MQTT broker") + self.client.subscribe(self.topic) + else: + logging.info(f"Failed to connect to MQTT broker with code: {rc}") + + def on_message(self, client, userdata, msg): + """Processing agent data and sent it to hub gateway""" + try: + payload: str = msg.payload.decode("utf-8") + # Create AgentData instance with the received data + agent_data = AgentData.model_validate_json(payload, strict=True) + # Process the received data (you can call a use case here if needed) + processed_data = process_agent_data(agent_data) + # Store the agent_data in the database (you can send it to the data processing module) + if not self.hub_gateway.save_data(processed_data): + logging.error("Hub is not available") + except Exception as e: + logging.info(f"Error processing MQTT message: {e}") + + def connect(self): + self.client.on_connect = self.on_connect + self.client.on_message = self.on_message + self.client.connect(self.broker_host, self.broker_port, 60) + + def start(self): + self.client.loop_start() + + def stop(self): + self.client.loop_stop() + + +# Usage example: +if __name__ == "__main__": + broker_host = "localhost" + broker_port = 1883 + topic = "agent_data_topic" + # Assuming you have implemented the StoreGateway and passed it to the adapter + store_gateway = HubGateway() + adapter = AgentMQTTAdapter(broker_host, broker_port, topic, store_gateway) + adapter.connect() + adapter.start() + try: + # Keep the adapter running in the background + while True: + pass + except KeyboardInterrupt: + adapter.stop() + logging.info("Adapter stopped.") diff --git a/edge/app/adapters/hub_http_adapter.py b/edge/app/adapters/hub_http_adapter.py new file mode 100644 index 0000000..d7cfe30 --- /dev/null +++ b/edge/app/adapters/hub_http_adapter.py @@ -0,0 +1,29 @@ +import logging + +import requests as requests + +from app.entities.processed_agent_data import ProcessedAgentData +from app.interfaces.hub_gateway import HubGateway + + +class HubHttpAdapter(HubGateway): + def __init__(self, api_base_url): + self.api_base_url = api_base_url + + def save_data(self, processed_data: ProcessedAgentData): + """ + Save the processed road data to the Hub. + Parameters: + processed_data (ProcessedAgentData): Processed road data to be saved. + Returns: + bool: True if the data is successfully saved, False otherwise. + """ + url = f"{self.api_base_url}/processed_agent_data/" + + response = requests.post(url, data=processed_data.model_dump_json()) + if response.status_code != 200: + logging.info( + f"Invalid Hub response\nData: {processed_data.model_dump_json()}\nResponse: {response}" + ) + return False + return True diff --git a/edge/app/adapters/hub_mqtt_adapter.py b/edge/app/adapters/hub_mqtt_adapter.py new file mode 100644 index 0000000..7c3de55 --- /dev/null +++ b/edge/app/adapters/hub_mqtt_adapter.py @@ -0,0 +1,50 @@ +import logging + +import requests as requests +from paho.mqtt import client as mqtt_client + +from app.entities.processed_agent_data import ProcessedAgentData +from app.interfaces.hub_gateway import HubGateway + + +class HubMqttAdapter(HubGateway): + def __init__(self, broker, port, topic): + self.broker = broker + self.port = port + self.topic = topic + self.mqtt_client = self._connect_mqtt(broker, port) + + def save_data(self, processed_data: ProcessedAgentData): + """ + Save the processed road data to the Hub. + Parameters: + processed_data (ProcessedAgentData): Processed road data to be saved. + Returns: + bool: True if the data is successfully saved, False otherwise. + """ + msg = processed_data.model_dump_json() + result = self.mqtt_client.publish(self.topic, msg) + status = result[0] + if status == 0: + return True + else: + print(f"Failed to send message to topic {self.topic}") + return False + + @staticmethod + def _connect_mqtt(broker, port): + """Create MQTT client""" + print(f"CONNECT TO {broker}:{port}") + + def on_connect(client, userdata, flags, rc): + if rc == 0: + print(f"Connected to MQTT Broker ({broker}:{port})!") + else: + print("Failed to connect {broker}:{port}, return code %d\n", rc) + exit(rc) # Stop execution + + client = mqtt_client.Client() + client.on_connect = on_connect + client.connect(broker, port) + client.loop_start() + return client diff --git a/edge/app/entities/agent_data.py b/edge/app/entities/agent_data.py new file mode 100644 index 0000000..3f92a8b --- /dev/null +++ b/edge/app/entities/agent_data.py @@ -0,0 +1,32 @@ +from datetime import datetime +from pydantic import BaseModel, field_validator + + +class AccelerometerData(BaseModel): + x: float + y: float + z: float + + +class GpsData(BaseModel): + latitude: float + longitude: float + + +class AgentData(BaseModel): + accelerometer: AccelerometerData + gps: GpsData + timestamp: datetime + + @classmethod + @field_validator("timestamp", mode="before") + def parse_timestamp(cls, value): + # Convert the timestamp to a datetime object + if isinstance(value, datetime): + return value + try: + return datetime.fromisoformat(value) + except (TypeError, ValueError): + raise ValueError( + "Invalid timestamp format. Expected ISO 8601 format (YYYY-MM-DDTHH:MM:SSZ)." + ) diff --git a/edge/app/entities/processed_agent_data.py b/edge/app/entities/processed_agent_data.py new file mode 100644 index 0000000..95accd4 --- /dev/null +++ b/edge/app/entities/processed_agent_data.py @@ -0,0 +1,7 @@ +from pydantic import BaseModel +from app.entities.agent_data import AgentData + + +class ProcessedAgentData(BaseModel): + road_state: str + agent_data: AgentData diff --git a/edge/app/interfaces/agent_gateway.py b/edge/app/interfaces/agent_gateway.py new file mode 100644 index 0000000..0e154f7 --- /dev/null +++ b/edge/app/interfaces/agent_gateway.py @@ -0,0 +1,40 @@ +from abc import ABC, abstractmethod + + +class AgentGateway(ABC): + """ + Abstract class representing the Agent Gateway interface. + All agent gateway adapters must implement these methods. + """ + + @abstractmethod + def on_message(self, client, userdata, msg): + """ + Method to handle incoming messages from the agent. + Parameters: + client: MQTT client instance. + userdata: Any additional user data passed to the MQTT client. + msg: The MQTT message received from the agent. + """ + pass + + @abstractmethod + def connect(self): + """ + Method to establish a connection to the agent. + """ + pass + + @abstractmethod + def start(self): + """ + Method to start listening for messages from the agent. + """ + pass + + @abstractmethod + def stop(self): + """ + Method to stop the agent gateway and clean up resources. + """ + pass diff --git a/edge/app/interfaces/hub_gateway.py b/edge/app/interfaces/hub_gateway.py new file mode 100644 index 0000000..eaf8b57 --- /dev/null +++ b/edge/app/interfaces/hub_gateway.py @@ -0,0 +1,20 @@ +from abc import ABC, abstractmethod +from app.entities.processed_agent_data import ProcessedAgentData + + +class HubGateway(ABC): + """ + Abstract class representing the Store Gateway interface. + All store gateway adapters must implement these methods. + """ + + @abstractmethod + def save_data(self, processed_data: ProcessedAgentData) -> bool: + """ + Method to save the processed agent data in the database. + Parameters: + processed_data (ProcessedAgentData): The processed agent data to be saved. + Returns: + bool: True if the data is successfully saved, False otherwise. + """ + pass diff --git a/edge/app/usecases/data_processing.py b/edge/app/usecases/data_processing.py new file mode 100644 index 0000000..712643c --- /dev/null +++ b/edge/app/usecases/data_processing.py @@ -0,0 +1,15 @@ +from app.entities.agent_data import AgentData +from app.entities.processed_agent_data import ProcessedAgentData + + +def process_agent_data( + agent_data: AgentData, +) -> ProcessedAgentData: + """ + Process agent data and classify the state of the road surface. + Parameters: + agent_data (AgentData): Agent data that containing accelerometer, GPS, and timestamp. + Returns: + processed_data_batch (ProcessedAgentData): Processed data containing the classified state of the road surface and agent data. + """ + # Implement it diff --git a/edge/config.py b/edge/config.py new file mode 100644 index 0000000..b92d0c3 --- /dev/null +++ b/edge/config.py @@ -0,0 +1,24 @@ +import os + + +def try_parse_int(value: str): + try: + return int(value) + except Exception: + return None + + +# Configuration for agent MQTT +MQTT_BROKER_HOST = os.environ.get("MQTT_BROKER_HOST") or "localhost" +MQTT_BROKER_PORT = try_parse_int(os.environ.get("MQTT_BROKER_PORT")) or 1883 +MQTT_TOPIC = os.environ.get("MQTT_TOPIC") or "agent_data_topic" + +# Configuration for hub MQTT +HUB_MQTT_BROKER_HOST = os.environ.get("HUB_MQTT_BROKER_HOST") or "localhost" +HUB_MQTT_BROKER_PORT = try_parse_int(os.environ.get("HUB_MQTT_BROKER_PORT")) or 1883 +HUB_MQTT_TOPIC = os.environ.get("HUB_MQTT_TOPIC") or "processed_agent_data_topic" + +# Configuration for the Hub +HUB_HOST = os.environ.get("HUB_HOST") or "localhost" +HUB_PORT = try_parse_int(os.environ.get("HUB_PORT")) or 12000 +HUB_URL = f"http://{HUB_HOST}:{HUB_PORT}" diff --git a/edge/main.py b/edge/main.py new file mode 100644 index 0000000..55a933a --- /dev/null +++ b/edge/main.py @@ -0,0 +1,51 @@ +import logging +from app.adapters.agent_mqtt_adapter import AgentMQTTAdapter +from app.adapters.hub_http_adapter import HubHttpAdapter +from app.adapters.hub_mqtt_adapter import HubMqttAdapter +from config import ( + MQTT_BROKER_HOST, + MQTT_BROKER_PORT, + MQTT_TOPIC, + HUB_URL, + HUB_MQTT_BROKER_HOST, + HUB_MQTT_BROKER_PORT, + HUB_MQTT_TOPIC, +) + +if __name__ == "__main__": + # Configure logging settings + logging.basicConfig( + level=logging.INFO, # Set the log level to INFO (you can use logging.DEBUG for more detailed logs) + format="[%(asctime)s] [%(levelname)s] [%(module)s] %(message)s", + handlers=[ + logging.StreamHandler(), # Output log messages to the console + logging.FileHandler("app.log"), # Save log messages to a file + ], + ) + # Create an instance of the StoreApiAdapter using the configuration + hub_adapter = HubHttpAdapter( + api_base_url=HUB_URL, + ) + # hub_adapter = HubMqttAdapter( + # broker=HUB_MQTT_BROKER_HOST, + # port=HUB_MQTT_BROKER_PORT, + # topic=HUB_MQTT_TOPIC, + # ) + # Create an instance of the AgentMQTTAdapter using the configuration + agent_adapter = AgentMQTTAdapter( + broker_host=MQTT_BROKER_HOST, + broker_port=MQTT_BROKER_PORT, + topic=MQTT_TOPIC, + hub_gateway=hub_adapter, + ) + try: + # Connect to the MQTT broker and start listening for messages + agent_adapter.connect() + agent_adapter.start() + # Keep the system running indefinitely (you can add other logic as needed) + while True: + pass + except KeyboardInterrupt: + # Stop the MQTT adapter and exit gracefully if interrupted by the user + agent_adapter.stop() + logging.info("System stopped.") diff --git a/edge/requirements.txt b/edge/requirements.txt new file mode 100644 index 0000000..eac1d86 --- /dev/null +++ b/edge/requirements.txt @@ -0,0 +1,43 @@ +absl-py==2.0.0 +annotated-types==0.5.0 +astunparse==1.6.3 +cachetools==5.3.2 +certifi==2023.7.22 +charset-normalizer==3.2.0 +flatbuffers==23.5.26 +gast==0.5.4 +google-auth==2.25.2 +google-auth-oauthlib==1.2.0 +google-pasta==0.2.0 +grpcio==1.60.0 +h5py==3.10.0 +idna==3.4 +keras==2.15.0 +libclang==16.0.6 +Markdown==3.5.1 +MarkupSafe==2.1.3 +ml-dtypes==0.2.0 +numpy==1.26.2 +oauthlib==3.2.2 +opt-einsum==3.3.0 +packaging==23.2 +paho-mqtt==1.6.1 +protobuf==4.23.4 +pyasn1==0.5.1 +pyasn1-modules==0.3.0 +pydantic==2.3.0 +pydantic_core==2.6.3 +requests==2.31.0 +requests-oauthlib==1.3.1 +rsa==4.9 +six==1.16.0 +tensorboard==2.15.1 +tensorboard-data-server==0.7.2 +tensorflow==2.15.0.post1 +tensorflow-estimator==2.15.0 +tensorflow-io-gcs-filesystem==0.34.0 +termcolor==2.4.0 +typing_extensions==4.7.1 +urllib3==2.0.4 +Werkzeug==3.0.1 +wrapt==1.14.1