from paho.mqtt import client as mqtt_client from schema.aggregated_data_schema import AggregatedDataSchema from file_datasource import FileDatasource import config def connect_mqtt(broker, port): """Create MQTT client""" print(f"CONNECT TO {broker}:{port}") def on_connect(client, userdata, flags, rc): if rc == 0: print(f"Connected to MQTT Broker ({broker}:{port})!") else: print("Failed to connect {broker}:{port}, return code %d\n", rc) exit(rc) # Stop execution client = mqtt_client.Client() client.on_connect = on_connect client.connect(broker, port) client.loop_start() return client def publish(client, topic, datasource, max_sends = None): datasource.startReading() i = 0 while True: i += 1 data = datasource.read() msg = AggregatedDataSchema().dumps(data) result = client.publish(topic, msg) status = result[0] if status != 0: print(f"Failed to send message to topic {topic}") if max_sends and i >= max_sends: # display test success exit(0) def run(): # Prepare mqtt client client = connect_mqtt(config.MQTT_BROKER_HOST, config.MQTT_BROKER_PORT) # Prepare datasource datasource = FileDatasource("data/accelerometer.csv", "data/gps.csv") # Infinity publish data publish(client, config.MQTT_TOPIC, datasource, getattr(config, "MAX_SENDS", None)) if __name__ == "__main__": run()