mirror of
https://github.com/Rhinemann/IoT-Systems.git
synced 2026-03-14 20:50:39 +02:00
changed Batch size to 20
reworked method "on_message" implemented storeApiAdapter
This commit is contained in:
parent
ceffcfeac2
commit
24aeb1a19f
@ -14,11 +14,30 @@ class StoreApiAdapter(StoreGateway):
|
||||
self.api_base_url = api_base_url
|
||||
|
||||
def save_data(self, processed_agent_data_batch: List[ProcessedAgentData]):
|
||||
"""
|
||||
Save the processed road data to the Store API.
|
||||
Parameters:
|
||||
processed_agent_data_batch (dict): Processed road data to be saved.
|
||||
Returns:
|
||||
bool: True if the data is successfully saved, False otherwise.
|
||||
"""
|
||||
# Implement it
|
||||
if not processed_agent_data_batch:
|
||||
return False
|
||||
|
||||
# Extract user_id from the first element
|
||||
user_id = processed_agent_data_batch[0].agent_data.user_id
|
||||
|
||||
payload = {
|
||||
"data": [item.model_dump(mode='json') for item in processed_agent_data_batch],
|
||||
"user_id": user_id
|
||||
}
|
||||
|
||||
try:
|
||||
# Perform a POST request to the Store API with a 10-second timeout
|
||||
response = requests.post(
|
||||
f"{self.api_base_url}/processed_agent_data/",
|
||||
json=payload,
|
||||
timeout=10
|
||||
)
|
||||
if response.status_code == 200:
|
||||
logging.info(f"Batch of {len(processed_agent_data_batch)} items sent to Store.")
|
||||
return True
|
||||
else:
|
||||
logging.error(f"Store API error: {response.status_code} - {response.text}")
|
||||
return False
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to send data to Store: {e}")
|
||||
return False
|
||||
|
||||
@ -93,7 +93,7 @@ services:
|
||||
MQTT_BROKER_HOST: "mqtt"
|
||||
MQTT_BROKER_PORT: 1883
|
||||
MQTT_TOPIC: "processed_data_topic"
|
||||
BATCH_SIZE: 1
|
||||
BATCH_SIZE: 20
|
||||
ports:
|
||||
- "9000:8000"
|
||||
networks:
|
||||
|
||||
16
hub/main.py
16
hub/main.py
@ -70,18 +70,20 @@ def on_message(client, userdata, msg):
|
||||
processed_agent_data = ProcessedAgentData.model_validate_json(
|
||||
payload, strict=True
|
||||
)
|
||||
|
||||
redis_client.lpush(
|
||||
"processed_agent_data", processed_agent_data.model_dump_json()
|
||||
)
|
||||
processed_agent_data_batch: List[ProcessedAgentData] = []
|
||||
|
||||
if redis_client.llen("processed_agent_data") >= BATCH_SIZE:
|
||||
processed_agent_data_batch: List[ProcessedAgentData] = []
|
||||
for _ in range(BATCH_SIZE):
|
||||
processed_agent_data = ProcessedAgentData.model_validate_json(
|
||||
redis_client.lpop("processed_agent_data")
|
||||
)
|
||||
processed_agent_data_batch.append(processed_agent_data)
|
||||
store_adapter.save_data(processed_agent_data_batch=processed_agent_data_batch)
|
||||
raw_data = redis_client.lpop("processed_agent_data")
|
||||
if raw_data:
|
||||
data_item = ProcessedAgentData.model_validate_json(raw_data)
|
||||
processed_agent_data_batch.append(data_item)
|
||||
|
||||
store_adapter.save_data(processed_agent_data_batch=processed_agent_data_batch)
|
||||
|
||||
return {"status": "ok"}
|
||||
except Exception as e:
|
||||
logging.info(f"Error processing MQTT message: {e}")
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user