diff --git a/hub/app/adapters/store_api_adapter.py b/hub/app/adapters/store_api_adapter.py index cd76be4..8ce4945 100644 --- a/hub/app/adapters/store_api_adapter.py +++ b/hub/app/adapters/store_api_adapter.py @@ -14,11 +14,30 @@ class StoreApiAdapter(StoreGateway): self.api_base_url = api_base_url def save_data(self, processed_agent_data_batch: List[ProcessedAgentData]): - """ - Save the processed road data to the Store API. - Parameters: - processed_agent_data_batch (dict): Processed road data to be saved. - Returns: - bool: True if the data is successfully saved, False otherwise. - """ - # Implement it + if not processed_agent_data_batch: + return False + + # Extract user_id from the first element + user_id = processed_agent_data_batch[0].agent_data.user_id + + payload = { + "data": [item.model_dump(mode='json') for item in processed_agent_data_batch], + "user_id": user_id + } + + try: + # Perform a POST request to the Store API with a 10-second timeout + response = requests.post( + f"{self.api_base_url}/processed_agent_data/", + json=payload, + timeout=10 + ) + if response.status_code == 200: + logging.info(f"Batch of {len(processed_agent_data_batch)} items sent to Store.") + return True + else: + logging.error(f"Store API error: {response.status_code} - {response.text}") + return False + except Exception as e: + logging.error(f"Failed to send data to Store: {e}") + return False diff --git a/hub/docker/docker-compose.yaml b/hub/docker/docker-compose.yaml index 639dbd4..4964dda 100644 --- a/hub/docker/docker-compose.yaml +++ b/hub/docker/docker-compose.yaml @@ -93,7 +93,7 @@ services: MQTT_BROKER_HOST: "mqtt" MQTT_BROKER_PORT: 1883 MQTT_TOPIC: "processed_data_topic" - BATCH_SIZE: 1 + BATCH_SIZE: 20 ports: - "9000:8000" networks: diff --git a/hub/main.py b/hub/main.py index 1c2c0e9..53544fa 100644 --- a/hub/main.py +++ b/hub/main.py @@ -70,18 +70,20 @@ def on_message(client, userdata, msg): processed_agent_data = ProcessedAgentData.model_validate_json( payload, strict=True ) - redis_client.lpush( "processed_agent_data", processed_agent_data.model_dump_json() ) - processed_agent_data_batch: List[ProcessedAgentData] = [] + if redis_client.llen("processed_agent_data") >= BATCH_SIZE: + processed_agent_data_batch: List[ProcessedAgentData] = [] for _ in range(BATCH_SIZE): - processed_agent_data = ProcessedAgentData.model_validate_json( - redis_client.lpop("processed_agent_data") - ) - processed_agent_data_batch.append(processed_agent_data) - store_adapter.save_data(processed_agent_data_batch=processed_agent_data_batch) + raw_data = redis_client.lpop("processed_agent_data") + if raw_data: + data_item = ProcessedAgentData.model_validate_json(raw_data) + processed_agent_data_batch.append(data_item) + + store_adapter.save_data(processed_agent_data_batch=processed_agent_data_batch) + return {"status": "ok"} except Exception as e: logging.info(f"Error processing MQTT message: {e}")