2024-02-02 22:51:54 +02:00
|
|
|
from paho.mqtt import client as mqtt_client
|
|
|
|
|
import json
|
|
|
|
|
import time
|
|
|
|
|
from schema.aggregated_data_schema import AggregatedDataSchema
|
|
|
|
|
from file_datasource import FileDatasource
|
|
|
|
|
import config
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def connect_mqtt(broker, port):
|
|
|
|
|
"""Create MQTT client"""
|
|
|
|
|
print(f"CONNECT TO {broker}:{port}")
|
|
|
|
|
|
|
|
|
|
def on_connect(client, userdata, flags, rc):
|
|
|
|
|
if rc == 0:
|
|
|
|
|
print(f"Connected to MQTT Broker ({broker}:{port})!")
|
|
|
|
|
else:
|
|
|
|
|
print("Failed to connect {broker}:{port}, return code %d\n", rc)
|
|
|
|
|
exit(rc) # Stop execution
|
|
|
|
|
|
|
|
|
|
client = mqtt_client.Client()
|
|
|
|
|
client.on_connect = on_connect
|
|
|
|
|
client.connect(broker, port)
|
|
|
|
|
client.loop_start()
|
|
|
|
|
return client
|
|
|
|
|
|
|
|
|
|
|
2026-02-23 23:53:42 +02:00
|
|
|
def publish(client, topic, datasource, delay, max_sends = None):
|
2024-02-02 22:51:54 +02:00
|
|
|
datasource.startReading()
|
2026-02-23 23:53:42 +02:00
|
|
|
|
|
|
|
|
i = 0
|
2024-02-02 22:51:54 +02:00
|
|
|
while True:
|
2026-02-23 23:53:42 +02:00
|
|
|
i += 1
|
|
|
|
|
|
2024-02-02 22:51:54 +02:00
|
|
|
time.sleep(delay)
|
|
|
|
|
data = datasource.read()
|
|
|
|
|
msg = AggregatedDataSchema().dumps(data)
|
|
|
|
|
result = client.publish(topic, msg)
|
|
|
|
|
# result: [0, 1]
|
|
|
|
|
status = result[0]
|
|
|
|
|
if status == 0:
|
|
|
|
|
pass
|
|
|
|
|
# print(f"Send `{msg}` to topic `{topic}`")
|
|
|
|
|
else:
|
|
|
|
|
print(f"Failed to send message to topic {topic}")
|
|
|
|
|
|
2026-02-23 23:53:42 +02:00
|
|
|
if max_sends and i >= max_sends:
|
|
|
|
|
# display test success
|
|
|
|
|
exit(0)
|
|
|
|
|
|
2024-02-02 22:51:54 +02:00
|
|
|
|
|
|
|
|
def run():
|
|
|
|
|
# Prepare mqtt client
|
|
|
|
|
client = connect_mqtt(config.MQTT_BROKER_HOST, config.MQTT_BROKER_PORT)
|
|
|
|
|
# Prepare datasource
|
|
|
|
|
datasource = FileDatasource("data/data.csv", "data/gps_data.csv")
|
|
|
|
|
# Infinity publish data
|
2026-02-23 23:53:42 +02:00
|
|
|
publish(client, config.MQTT_TOPIC, datasource, config.DELAY, getattr(config, "MAX_SENDS", None))
|
2024-02-02 22:51:54 +02:00
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
run()
|