Files
IoT-Systems/edge/app/adapters/agent_mqtt_adapter.py
esk4nz c085a49c8c implement Edge-Hub integration with user_id validation (SCRUM-93, SCRUM-94)
- Agent: Updated config and main
- Edge: Implemented adapter factory in main.py to switch between MQTT and HTTP.
- Edge: Updated AgentData entity and processing logic to support user_id.
- Infrastructure: Configured docker-compose for dynamic protocol switching and environment management.
2026-03-23 21:31:31 +02:00

80 lines
2.7 KiB
Python

import logging
import paho.mqtt.client as mqtt
from app.interfaces.agent_gateway import AgentGateway
from app.entities.agent_data import AgentData, GpsData
from app.usecases.data_processing import process_agent_data
from app.interfaces.hub_gateway import HubGateway
class AgentMQTTAdapter(AgentGateway):
def __init__(
self,
broker_host,
broker_port,
topic,
hub_gateway: HubGateway,
batch_size=10,
):
self.batch_size = batch_size
# MQTT
self.broker_host = broker_host
self.broker_port = broker_port
self.topic = topic
self.client = mqtt.Client()
# Hub
self.hub_gateway = hub_gateway
def on_connect(self, client, userdata, flags, rc):
if rc == 0:
logging.info("Connected to MQTT broker")
self.client.subscribe(self.topic)
else:
logging.info(f"Failed to connect to MQTT broker with code: {rc}")
def on_message(self, client, userdata, msg):
"""Processing agent data and sent it to hub gateway"""
try:
payload: str = msg.payload.decode("utf-8")
# Create AgentData instance with the received data
agent_data = AgentData.model_validate_json(payload, strict=True)
# Process the received data (you can call a use case here if needed)
processed_data = process_agent_data(agent_data)
# Store the agent_data in the database (you can send it to the data processing module)
if processed_data is not None:
if not self.hub_gateway.save_data(processed_data):
logging.error("Hub is not available")
else:
logging.info("Road is fine, no data sent to hub.")
except Exception as e:
logging.info(f"Error processing MQTT message: {e}")
def connect(self):
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
self.client.connect(self.broker_host, self.broker_port, 60)
def start(self):
self.client.loop_start()
def stop(self):
self.client.loop_stop()
# Usage example:
if __name__ == "__main__":
broker_host = "localhost"
broker_port = 1883
topic = "agent_data_topic"
# Assuming you have implemented the StoreGateway and passed it to the adapter
store_gateway = HubGateway()
adapter = AgentMQTTAdapter(broker_host, broker_port, topic, store_gateway)
adapter.connect()
adapter.start()
try:
# Keep the adapter running in the background
while True:
pass
except KeyboardInterrupt:
adapter.stop()
logging.info("Adapter stopped.")