diff --git a/agent/src/config.py b/agent/src/config.py index f507c71..c5ae3f3 100644 --- a/agent/src/config.py +++ b/agent/src/config.py @@ -8,7 +8,7 @@ def try_parse(type, value: str): return None -USER_ID = 1 +USER_ID = try_parse(int, os.environ.get("USER_ID")) or 1 # MQTT config MQTT_BROKER_HOST = os.environ.get("MQTT_BROKER_HOST") or "mqtt" MQTT_BROKER_PORT = try_parse(int, os.environ.get("MQTT_BROKER_PORT")) or 1883 diff --git a/agent/src/main.py b/agent/src/main.py index ce17d6b..db81260 100644 --- a/agent/src/main.py +++ b/agent/src/main.py @@ -1,6 +1,7 @@ from paho.mqtt import client as mqtt_client from schema.aggregated_data_schema import AggregatedDataSchema from file_datasource import FileDatasource +import logging import config @@ -28,6 +29,7 @@ def publish(client, topic, datasource): data = datasource.read() msg = AggregatedDataSchema().dumps(data) result = client.publish(topic, msg) + logging.info(f"Published to {topic}: {msg[:50]}...") status = result[0] if status != 0: print(f"Failed to send message to topic {topic}") diff --git a/docker-compose.yaml b/docker-compose.yaml index 54cdbbe..26f0276 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -22,6 +22,7 @@ services: depends_on: - mqtt environment: + PYTHONUNBUFFERED: 1 MQTT_BROKER_HOST: "mqtt" MQTT_BROKER_PORT: 1883 MQTT_TOPIC: "agent_data_topic" @@ -39,9 +40,10 @@ services: environment: MQTT_BROKER_HOST: "mqtt" MQTT_BROKER_PORT: 1883 - MQTT_TOPIC: " " - HUB_HOST: "store" + MQTT_TOPIC: "agent_data_topic" + HUB_HOST: "hub" HUB_PORT: 8000 + HUB_CONNECTION_TYPE: "http" HUB_MQTT_BROKER_HOST: "mqtt" HUB_MQTT_BROKER_PORT: 1883 HUB_MQTT_TOPIC: "processed_data_topic" @@ -89,6 +91,7 @@ services: - postgres_db restart: always environment: + PYTHONUNBUFFERED: 1 POSTGRES_USER: user POSTGRES_PASSWORD: pass POSTGRES_DB: test_db @@ -120,6 +123,7 @@ services: - redis - store environment: + PYTHONUNBUFFERED: 1 STORE_API_HOST: "store" STORE_API_PORT: 8000 REDIS_HOST: "redis" diff --git a/edge/app/adapters/agent_mqtt_adapter.py b/edge/app/adapters/agent_mqtt_adapter.py index 855370f..76c0c0a 100644 --- a/edge/app/adapters/agent_mqtt_adapter.py +++ b/edge/app/adapters/agent_mqtt_adapter.py @@ -13,9 +13,7 @@ class AgentMQTTAdapter(AgentGateway): broker_port, topic, hub_gateway: HubGateway, - batch_size=10, ): - self.batch_size = batch_size # MQTT self.broker_host = broker_host self.broker_port = broker_port @@ -35,42 +33,21 @@ class AgentMQTTAdapter(AgentGateway): """Processing agent data and sent it to hub gateway""" try: payload: str = msg.payload.decode("utf-8") - # Create AgentData instance with the received data + agent_data = AgentData.model_validate_json(payload, strict=True) - # Process the received data (you can call a use case here if needed) processed_data = process_agent_data(agent_data) - # Store the agent_data in the database (you can send it to the data processing module) - if not self.hub_gateway.save_data(processed_data): - logging.error("Hub is not available") + + if self.hub_gateway.save_data(processed_data): + logging.info("Processed data successfully forwarded to the Hub.") + else: + logging.error("Failed to send data: Hub gateway is unavailable.") except Exception as e: - logging.info(f"Error processing MQTT message: {e}") + logging.error(f"Error processing MQTT message: {e}") def connect(self): self.client.on_connect = self.on_connect self.client.on_message = self.on_message self.client.connect(self.broker_host, self.broker_port, 60) - def start(self): - self.client.loop_start() - - def stop(self): - self.client.loop_stop() - - -# Usage example: -if __name__ == "__main__": - broker_host = "localhost" - broker_port = 1883 - topic = "agent_data_topic" - # Assuming you have implemented the StoreGateway and passed it to the adapter - store_gateway = HubGateway() - adapter = AgentMQTTAdapter(broker_host, broker_port, topic, store_gateway) - adapter.connect() - adapter.start() - try: - # Keep the adapter running in the background - while True: - pass - except KeyboardInterrupt: - adapter.stop() - logging.info("Adapter stopped.") + def loop_forever(self): + self.client.loop_forever() diff --git a/edge/app/entities/agent_data.py b/edge/app/entities/agent_data.py index 3f92a8b..fdb2b5e 100644 --- a/edge/app/entities/agent_data.py +++ b/edge/app/entities/agent_data.py @@ -14,6 +14,7 @@ class GpsData(BaseModel): class AgentData(BaseModel): + user_id: int accelerometer: AccelerometerData gps: GpsData timestamp: datetime diff --git a/edge/app/interfaces/agent_gateway.py b/edge/app/interfaces/agent_gateway.py index 0e154f7..b868ab5 100644 --- a/edge/app/interfaces/agent_gateway.py +++ b/edge/app/interfaces/agent_gateway.py @@ -26,15 +26,8 @@ class AgentGateway(ABC): pass @abstractmethod - def start(self): + def loop_forever(self): """ - Method to start listening for messages from the agent. - """ - pass - - @abstractmethod - def stop(self): - """ - Method to stop the agent gateway and clean up resources. + Method to await for new messages. """ pass diff --git a/edge/app/usecases/data_processing.py b/edge/app/usecases/data_processing.py index 712643c..e4bb712 100644 --- a/edge/app/usecases/data_processing.py +++ b/edge/app/usecases/data_processing.py @@ -13,3 +13,7 @@ def process_agent_data( processed_data_batch (ProcessedAgentData): Processed data containing the classified state of the road surface and agent data. """ # Implement it + return ProcessedAgentData( + road_state="normal", + agent_data=agent_data + ) diff --git a/edge/config.py b/edge/config.py index b92d0c3..ee6efa3 100644 --- a/edge/config.py +++ b/edge/config.py @@ -16,9 +16,12 @@ MQTT_TOPIC = os.environ.get("MQTT_TOPIC") or "agent_data_topic" # Configuration for hub MQTT HUB_MQTT_BROKER_HOST = os.environ.get("HUB_MQTT_BROKER_HOST") or "localhost" HUB_MQTT_BROKER_PORT = try_parse_int(os.environ.get("HUB_MQTT_BROKER_PORT")) or 1883 -HUB_MQTT_TOPIC = os.environ.get("HUB_MQTT_TOPIC") or "processed_agent_data_topic" +HUB_MQTT_TOPIC = os.environ.get("HUB_MQTT_TOPIC") or "processed_data_topic" # Configuration for the Hub HUB_HOST = os.environ.get("HUB_HOST") or "localhost" -HUB_PORT = try_parse_int(os.environ.get("HUB_PORT")) or 12000 +HUB_PORT = try_parse_int(os.environ.get("HUB_PORT")) or 8000 HUB_URL = f"http://{HUB_HOST}:{HUB_PORT}" + +# For choosing type of connection +HUB_CONNECTION_TYPE = os.environ.get("HUB_CONNECTION_TYPE") or "mqtt" \ No newline at end of file diff --git a/edge/main.py b/edge/main.py index b2166a2..198e7c9 100644 --- a/edge/main.py +++ b/edge/main.py @@ -10,42 +10,51 @@ from config import ( HUB_MQTT_BROKER_HOST, HUB_MQTT_BROKER_PORT, HUB_MQTT_TOPIC, + HUB_CONNECTION_TYPE, ) if __name__ == "__main__": # Configure logging settings logging.basicConfig( - level=logging.INFO, # Set the log level to INFO (you can use logging.DEBUG for more detailed logs) + level=logging.INFO, format="[%(asctime)s] [%(levelname)s] [%(module)s] %(message)s", handlers=[ - logging.StreamHandler(), # Output log messages to the console - logging.FileHandler("app.log"), # Save log messages to a file + logging.StreamHandler(), + logging.FileHandler("app.log"), ], ) - # Create an instance of the StoreApiAdapter using the configuration - # hub_adapter = HubHttpAdapter( - # api_base_url=HUB_URL, - # ) - hub_adapter = HubMqttAdapter( - broker=HUB_MQTT_BROKER_HOST, - port=HUB_MQTT_BROKER_PORT, - topic=HUB_MQTT_TOPIC, - ) - # Create an instance of the AgentMQTTAdapter using the configuration + + # Logic to select the adapter based on configuration (SCRUM-93 & SCRUM-94) + # This allows easy switching between HTTP and MQTT protocols + if HUB_CONNECTION_TYPE.lower() == "http": + logging.info("Initializing HubHttpAdapter (SCRUM-93 integration)") + hub_adapter = HubHttpAdapter( + api_base_url=HUB_URL, + ) + else: + logging.info("Initializing HubMqttAdapter (SCRUM-94 integration)") + hub_adapter = HubMqttAdapter( + broker=HUB_MQTT_BROKER_HOST, + port=HUB_MQTT_BROKER_PORT, + topic=HUB_MQTT_TOPIC, + ) + + # Create an instance of the AgentMQTTAdapter using the selected hub adapter + # This adapter acts as a bridge between the Agent and the Hub agent_adapter = AgentMQTTAdapter( broker_host=MQTT_BROKER_HOST, broker_port=MQTT_BROKER_PORT, topic=MQTT_TOPIC, hub_gateway=hub_adapter, ) + try: - # Connect to the MQTT broker and start listening for messages + logging.info(f"Connecting to MQTT broker at {MQTT_BROKER_HOST}:{MQTT_BROKER_PORT}") agent_adapter.connect() - agent_adapter.start() - # Keep the system running indefinitely (you can add other logic as needed) - while True: - pass + + logging.info("Broker connection success. Waiting for data...") + agent_adapter.loop_forever() except KeyboardInterrupt: - # Stop the MQTT adapter and exit gracefully if interrupted by the user - agent_adapter.stop() - logging.info("System stopped.") + logging.info("Interrupt signal received. Shutting down...") + agent_adapter.disconnect() + logging.info("Disconnected from MQTT broker.")