add: edge template
This commit is contained in:
76
edge/app/adapters/agent_mqtt_adapter.py
Normal file
76
edge/app/adapters/agent_mqtt_adapter.py
Normal file
@@ -0,0 +1,76 @@
|
||||
import logging
|
||||
import paho.mqtt.client as mqtt
|
||||
from app.interfaces.agent_gateway import AgentGateway
|
||||
from app.entities.agent_data import AgentData, GpsData
|
||||
from app.usecases.data_processing import process_agent_data
|
||||
from app.interfaces.hub_gateway import HubGateway
|
||||
|
||||
|
||||
class AgentMQTTAdapter(AgentGateway):
|
||||
def __init__(
|
||||
self,
|
||||
broker_host,
|
||||
broker_port,
|
||||
topic,
|
||||
hub_gateway: HubGateway,
|
||||
batch_size=10,
|
||||
):
|
||||
self.batch_size = batch_size
|
||||
# MQTT
|
||||
self.broker_host = broker_host
|
||||
self.broker_port = broker_port
|
||||
self.topic = topic
|
||||
self.client = mqtt.Client()
|
||||
# Hub
|
||||
self.hub_gateway = hub_gateway
|
||||
|
||||
def on_connect(self, client, userdata, flags, rc):
|
||||
if rc == 0:
|
||||
logging.info("Connected to MQTT broker")
|
||||
self.client.subscribe(self.topic)
|
||||
else:
|
||||
logging.info(f"Failed to connect to MQTT broker with code: {rc}")
|
||||
|
||||
def on_message(self, client, userdata, msg):
|
||||
"""Processing agent data and sent it to hub gateway"""
|
||||
try:
|
||||
payload: str = msg.payload.decode("utf-8")
|
||||
# Create AgentData instance with the received data
|
||||
agent_data = AgentData.model_validate_json(payload, strict=True)
|
||||
# Process the received data (you can call a use case here if needed)
|
||||
processed_data = process_agent_data(agent_data)
|
||||
# Store the agent_data in the database (you can send it to the data processing module)
|
||||
if not self.hub_gateway.save_data(processed_data):
|
||||
logging.error("Hub is not available")
|
||||
except Exception as e:
|
||||
logging.info(f"Error processing MQTT message: {e}")
|
||||
|
||||
def connect(self):
|
||||
self.client.on_connect = self.on_connect
|
||||
self.client.on_message = self.on_message
|
||||
self.client.connect(self.broker_host, self.broker_port, 60)
|
||||
|
||||
def start(self):
|
||||
self.client.loop_start()
|
||||
|
||||
def stop(self):
|
||||
self.client.loop_stop()
|
||||
|
||||
|
||||
# Usage example:
|
||||
if __name__ == "__main__":
|
||||
broker_host = "localhost"
|
||||
broker_port = 1883
|
||||
topic = "agent_data_topic"
|
||||
# Assuming you have implemented the StoreGateway and passed it to the adapter
|
||||
store_gateway = HubGateway()
|
||||
adapter = AgentMQTTAdapter(broker_host, broker_port, topic, store_gateway)
|
||||
adapter.connect()
|
||||
adapter.start()
|
||||
try:
|
||||
# Keep the adapter running in the background
|
||||
while True:
|
||||
pass
|
||||
except KeyboardInterrupt:
|
||||
adapter.stop()
|
||||
logging.info("Adapter stopped.")
|
||||
29
edge/app/adapters/hub_http_adapter.py
Normal file
29
edge/app/adapters/hub_http_adapter.py
Normal file
@@ -0,0 +1,29 @@
|
||||
import logging
|
||||
|
||||
import requests as requests
|
||||
|
||||
from app.entities.processed_agent_data import ProcessedAgentData
|
||||
from app.interfaces.hub_gateway import HubGateway
|
||||
|
||||
|
||||
class HubHttpAdapter(HubGateway):
|
||||
def __init__(self, api_base_url):
|
||||
self.api_base_url = api_base_url
|
||||
|
||||
def save_data(self, processed_data: ProcessedAgentData):
|
||||
"""
|
||||
Save the processed road data to the Hub.
|
||||
Parameters:
|
||||
processed_data (ProcessedAgentData): Processed road data to be saved.
|
||||
Returns:
|
||||
bool: True if the data is successfully saved, False otherwise.
|
||||
"""
|
||||
url = f"{self.api_base_url}/processed_agent_data/"
|
||||
|
||||
response = requests.post(url, data=processed_data.model_dump_json())
|
||||
if response.status_code != 200:
|
||||
logging.info(
|
||||
f"Invalid Hub response\nData: {processed_data.model_dump_json()}\nResponse: {response}"
|
||||
)
|
||||
return False
|
||||
return True
|
||||
50
edge/app/adapters/hub_mqtt_adapter.py
Normal file
50
edge/app/adapters/hub_mqtt_adapter.py
Normal file
@@ -0,0 +1,50 @@
|
||||
import logging
|
||||
|
||||
import requests as requests
|
||||
from paho.mqtt import client as mqtt_client
|
||||
|
||||
from app.entities.processed_agent_data import ProcessedAgentData
|
||||
from app.interfaces.hub_gateway import HubGateway
|
||||
|
||||
|
||||
class HubMqttAdapter(HubGateway):
|
||||
def __init__(self, broker, port, topic):
|
||||
self.broker = broker
|
||||
self.port = port
|
||||
self.topic = topic
|
||||
self.mqtt_client = self._connect_mqtt(broker, port)
|
||||
|
||||
def save_data(self, processed_data: ProcessedAgentData):
|
||||
"""
|
||||
Save the processed road data to the Hub.
|
||||
Parameters:
|
||||
processed_data (ProcessedAgentData): Processed road data to be saved.
|
||||
Returns:
|
||||
bool: True if the data is successfully saved, False otherwise.
|
||||
"""
|
||||
msg = processed_data.model_dump_json()
|
||||
result = self.mqtt_client.publish(self.topic, msg)
|
||||
status = result[0]
|
||||
if status == 0:
|
||||
return True
|
||||
else:
|
||||
print(f"Failed to send message to topic {self.topic}")
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def _connect_mqtt(broker, port):
|
||||
"""Create MQTT client"""
|
||||
print(f"CONNECT TO {broker}:{port}")
|
||||
|
||||
def on_connect(client, userdata, flags, rc):
|
||||
if rc == 0:
|
||||
print(f"Connected to MQTT Broker ({broker}:{port})!")
|
||||
else:
|
||||
print("Failed to connect {broker}:{port}, return code %d\n", rc)
|
||||
exit(rc) # Stop execution
|
||||
|
||||
client = mqtt_client.Client()
|
||||
client.on_connect = on_connect
|
||||
client.connect(broker, port)
|
||||
client.loop_start()
|
||||
return client
|
||||
Reference in New Issue
Block a user