Files
IoT-Systems/agent/src/main.py

52 lines
1.5 KiB
Python
Raw Normal View History

2024-02-02 22:51:54 +02:00
from paho.mqtt import client as mqtt_client
from schema.aggregated_data_schema import AggregatedDataSchema
from file_datasource import FileDatasource
import logging
2024-02-02 22:51:54 +02:00
import config
def connect_mqtt(broker, port):
"""Create MQTT client"""
print(f"CONNECT TO {broker}:{port}")
def on_connect(client, userdata, flags, rc):
if rc == 0:
print(f"Connected to MQTT Broker ({broker}:{port})!")
else:
print("Failed to connect {broker}:{port}, return code %d\n", rc)
exit(rc) # Stop execution
2026-03-25 15:05:01 +02:00
logging.info(f"Acting as USER_ID = {config.USER_ID}")
2024-02-02 22:51:54 +02:00
client = mqtt_client.Client()
client.on_connect = on_connect
client.connect(broker, port)
client.loop_start()
return client
def publish(client, topic, datasource):
2024-02-02 22:51:54 +02:00
datasource.startReading()
while True:
data = datasource.read()
msg = AggregatedDataSchema().dumps(data)
result = client.publish(topic, msg)
2026-03-25 15:05:01 +02:00
logging.debug(f"Published to {topic}: {msg[:50]}...")
2024-02-02 22:51:54 +02:00
status = result[0]
if status != 0:
2026-03-25 15:05:01 +02:00
logging.error(f"Failed to send message to topic {topic}")
2024-02-02 22:51:54 +02:00
def run():
2026-03-25 15:05:01 +02:00
logging.basicConfig(level = logging.INFO)
2024-02-02 22:51:54 +02:00
# Prepare mqtt client
client = connect_mqtt(config.MQTT_BROKER_HOST, config.MQTT_BROKER_PORT)
# Prepare datasource
2026-03-25 21:43:13 +02:00
datasource = FileDatasource(16384.0, "data/accelerometer.csv", config.GPS_SOURCE, "data/parking.csv")
2024-02-02 22:51:54 +02:00
# Infinity publish data
publish(client, config.MQTT_TOPIC, datasource)
2024-02-02 22:51:54 +02:00
if __name__ == "__main__":
2026-02-26 12:11:07 +02:00
run()