mirror of
https://github.com/Rhinemann/IoT-Systems.git
synced 2026-03-14 20:50:39 +02:00
77 lines
2.5 KiB
Python
77 lines
2.5 KiB
Python
import logging
|
|
import paho.mqtt.client as mqtt
|
|
from app.interfaces.agent_gateway import AgentGateway
|
|
from app.entities.agent_data import AgentData, GpsData
|
|
from app.usecases.data_processing import process_agent_data
|
|
from app.interfaces.hub_gateway import HubGateway
|
|
|
|
|
|
class AgentMQTTAdapter(AgentGateway):
|
|
def __init__(
|
|
self,
|
|
broker_host,
|
|
broker_port,
|
|
topic,
|
|
hub_gateway: HubGateway,
|
|
batch_size=10,
|
|
):
|
|
self.batch_size = batch_size
|
|
# MQTT
|
|
self.broker_host = broker_host
|
|
self.broker_port = broker_port
|
|
self.topic = topic
|
|
self.client = mqtt.Client()
|
|
# Hub
|
|
self.hub_gateway = hub_gateway
|
|
|
|
def on_connect(self, client, userdata, flags, rc):
|
|
if rc == 0:
|
|
logging.info("Connected to MQTT broker")
|
|
self.client.subscribe(self.topic)
|
|
else:
|
|
logging.info(f"Failed to connect to MQTT broker with code: {rc}")
|
|
|
|
def on_message(self, client, userdata, msg):
|
|
"""Processing agent data and sent it to hub gateway"""
|
|
try:
|
|
payload: str = msg.payload.decode("utf-8")
|
|
# Create AgentData instance with the received data
|
|
agent_data = AgentData.model_validate_json(payload, strict=True)
|
|
# Process the received data (you can call a use case here if needed)
|
|
processed_data = process_agent_data(agent_data)
|
|
# Store the agent_data in the database (you can send it to the data processing module)
|
|
if not self.hub_gateway.save_data(processed_data):
|
|
logging.error("Hub is not available")
|
|
except Exception as e:
|
|
logging.info(f"Error processing MQTT message: {e}")
|
|
|
|
def connect(self):
|
|
self.client.on_connect = self.on_connect
|
|
self.client.on_message = self.on_message
|
|
self.client.connect(self.broker_host, self.broker_port, 60)
|
|
|
|
def start(self):
|
|
self.client.loop_start()
|
|
|
|
def stop(self):
|
|
self.client.loop_stop()
|
|
|
|
|
|
# Usage example:
|
|
if __name__ == "__main__":
|
|
broker_host = "localhost"
|
|
broker_port = 1883
|
|
topic = "agent_data_topic"
|
|
# Assuming you have implemented the StoreGateway and passed it to the adapter
|
|
store_gateway = HubGateway()
|
|
adapter = AgentMQTTAdapter(broker_host, broker_port, topic, store_gateway)
|
|
adapter.connect()
|
|
adapter.start()
|
|
try:
|
|
# Keep the adapter running in the background
|
|
while True:
|
|
pass
|
|
except KeyboardInterrupt:
|
|
adapter.stop()
|
|
logging.info("Adapter stopped.")
|