diff --git a/edge/app/adapters/agent_mqtt_adapter.py b/edge/app/adapters/agent_mqtt_adapter.py index 6b1f845..a45f928 100644 --- a/edge/app/adapters/agent_mqtt_adapter.py +++ b/edge/app/adapters/agent_mqtt_adapter.py @@ -52,27 +52,5 @@ class AgentMQTTAdapter(AgentGateway): self.client.on_message = self.on_message self.client.connect(self.broker_host, self.broker_port, 60) - def start(self): - self.client.loop_start() - - def stop(self): - self.client.loop_stop() - - -# Usage example: -if __name__ == "__main__": - broker_host = "localhost" - broker_port = 1883 - topic = "agent_data_topic" - # Assuming you have implemented the StoreGateway and passed it to the adapter - store_gateway = HubGateway() - adapter = AgentMQTTAdapter(broker_host, broker_port, topic, store_gateway) - adapter.connect() - adapter.start() - try: - # Keep the adapter running in the background - while True: - pass - except KeyboardInterrupt: - adapter.stop() - logging.info("Adapter stopped.") + def loop_forever(self): + self.client.loop_forever() diff --git a/edge/app/interfaces/agent_gateway.py b/edge/app/interfaces/agent_gateway.py index 0e154f7..b868ab5 100644 --- a/edge/app/interfaces/agent_gateway.py +++ b/edge/app/interfaces/agent_gateway.py @@ -26,15 +26,8 @@ class AgentGateway(ABC): pass @abstractmethod - def start(self): + def loop_forever(self): """ - Method to start listening for messages from the agent. - """ - pass - - @abstractmethod - def stop(self): - """ - Method to stop the agent gateway and clean up resources. + Method to await for new messages. """ pass diff --git a/edge/main.py b/edge/main.py index 22a8f23..3e26330 100644 --- a/edge/main.py +++ b/edge/main.py @@ -3,7 +3,6 @@ import os from app.adapters.agent_mqtt_adapter import AgentMQTTAdapter from app.adapters.hub_http_adapter import HubHttpAdapter from app.adapters.hub_mqtt_adapter import HubMqttAdapter -from threading import Event from config import ( MQTT_BROKER_HOST, MQTT_BROKER_PORT, @@ -26,9 +25,6 @@ if __name__ == "__main__": ], ) - # Initialize the stop event to prevent high CPU usage - stop_event = Event() - # Logic to select the adapter based on configuration (SCRUM-93 & SCRUM-94) # This allows easy switching between HTTP and MQTT protocols if HUB_CONNECTION_TYPE.lower() == "http": @@ -54,19 +50,12 @@ if __name__ == "__main__": ) try: - logging.info(f"Starting Edge module. Connecting to Agent Broker at {MQTT_BROKER_HOST}:{MQTT_BROKER_PORT}") - # Connect to the MQTT broker and start listening for messages from Agent + logging.info(f"Connecting to MQTT broker at {MQTT_BROKER_HOST}:{MQTT_BROKER_PORT}") agent_adapter.connect() - agent_adapter.start() - - logging.info("Edge module started successfully. Waiting for data...") - - # Block the main thread efficiently without consuming CPU cycles - stop_event.wait() + logging.info("Broker connection success. Waiting for data...") + agent_adapter.loop_forever() except KeyboardInterrupt: - # Stop the MQTT adapter and exit gracefully if interrupted by the user - logging.info("Stop signal received. Shutting down...") - agent_adapter.stop() - stop_event.set() # Release the event - logging.info("System stopped.") \ No newline at end of file + logging.info("Interrupt signal received. Shutting down...") + agent_adapter.disconnect() + logging.info("Disconnected from MQTT broker.")