Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 987e968dd4 | |||
| db63eb6d79 |
+5
-2
@@ -16,6 +16,8 @@ def connect_mqtt(broker, port):
|
|||||||
print("Failed to connect {broker}:{port}, return code %d\n", rc)
|
print("Failed to connect {broker}:{port}, return code %d\n", rc)
|
||||||
exit(rc) # Stop execution
|
exit(rc) # Stop execution
|
||||||
|
|
||||||
|
logging.info(f"Acting as USER_ID = {config.USER_ID}")
|
||||||
|
|
||||||
client = mqtt_client.Client()
|
client = mqtt_client.Client()
|
||||||
client.on_connect = on_connect
|
client.on_connect = on_connect
|
||||||
client.connect(broker, port)
|
client.connect(broker, port)
|
||||||
@@ -29,13 +31,14 @@ def publish(client, topic, datasource):
|
|||||||
data = datasource.read()
|
data = datasource.read()
|
||||||
msg = AggregatedDataSchema().dumps(data)
|
msg = AggregatedDataSchema().dumps(data)
|
||||||
result = client.publish(topic, msg)
|
result = client.publish(topic, msg)
|
||||||
logging.info(f"Published to {topic}: {msg[:50]}...")
|
logging.debug(f"Published to {topic}: {msg[:50]}...")
|
||||||
status = result[0]
|
status = result[0]
|
||||||
if status != 0:
|
if status != 0:
|
||||||
print(f"Failed to send message to topic {topic}")
|
logging.error(f"Failed to send message to topic {topic}")
|
||||||
|
|
||||||
|
|
||||||
def run():
|
def run():
|
||||||
|
logging.basicConfig(level = logging.INFO)
|
||||||
# Prepare mqtt client
|
# Prepare mqtt client
|
||||||
client = connect_mqtt(config.MQTT_BROKER_HOST, config.MQTT_BROKER_PORT)
|
client = connect_mqtt(config.MQTT_BROKER_HOST, config.MQTT_BROKER_PORT)
|
||||||
# Prepare datasource
|
# Prepare datasource
|
||||||
|
|||||||
+1
-1
@@ -96,7 +96,7 @@ async def create_processed_agent_data(data: List[ProcessedAgentData], user_id: i
|
|||||||
created_records = [dict(row._mapping) for row in result.fetchall()]
|
created_records = [dict(row._mapping) for row in result.fetchall()]
|
||||||
session.commit()
|
session.commit()
|
||||||
|
|
||||||
for record in created_records:
|
for record in sorted(created_records, key = lambda x: x['timestamp']):
|
||||||
await send_data_to_subscribers(jsonable_encoder(record))
|
await send_data_to_subscribers(jsonable_encoder(record))
|
||||||
return created_records
|
return created_records
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
|
|||||||
Reference in New Issue
Block a user