[P] Add CI for updated Hub component part
Some checks failed
Hub component testing / hub-test (push) Failing after 24s

This commit is contained in:
2026-03-23 15:19:19 +02:00
parent 30f81ec1ae
commit 8d6889cad9
5 changed files with 76 additions and 1 deletions

12
hub/Dockerfile-test Normal file
View File

@@ -0,0 +1,12 @@
# Use the official Python image as the base image
FROM python:3.9-slim
# Set the working directory inside the container
WORKDIR /app
# Copy the requirements.txt file and install dependencies
COPY hub/requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy the entire application into the container
COPY hub/. .
# Run the main.py script inside the container when it starts
CMD ["./test-entry.sh"]

View File

@@ -13,7 +13,7 @@ class StoreApiAdapter(StoreGateway):
def __init__(self, api_base_url):
self.api_base_url = api_base_url
def save_data(self, processed_agent_data_batch: List[ProcessedAgentData]):
def processed_agent_data_batch_to_json(self, processed_agent_data_batch: List[ProcessedAgentData]):
if not processed_agent_data_batch:
return False
@@ -25,6 +25,14 @@ class StoreApiAdapter(StoreGateway):
"user_id": user_id
}
return payload
def save_data(self, processed_agent_data_batch: List[ProcessedAgentData]):
payload = self.processed_agent_data_batch_to_json(processed_agent_data_batch)
if payload == False:
return False
try:
# Perform a POST request to the Store API with a 10-second timeout
response = requests.post(

View File

@@ -0,0 +1,31 @@
from app.adapters.store_api_adapter import StoreApiAdapter
from app.entities.agent_data import AccelerometerData, AgentData, GpsData
from app.entities.processed_agent_data import ProcessedAgentData
def test_processed_agent_data_batch_to_json():
processed_data_batch = [
ProcessedAgentData(road_state = "normal",
agent_data = AgentData(user_id = 1,
accelerometer = AccelerometerData(x = 0.1, y = 0.2, z = 0.3),
gps = GpsData(latitude = 10.123, longitude = 20.456),
timestamp = "2023-07-21T12:34:56Z"),
ProcessedAgentData(road_state = "normal",
agent_data = AgentData(user_id = 2,
accelerometer = AccelerometerData(x = 0.1, y = 0.2, z = 0.3),
gps = GpsData(latitude = 10.123, longitude = 20.456),
timestamp = "2023-07-21T12:34:56Z"),
ProcessedAgentData(road_state = "normal",
agent_data = AgentData(user_id = 3,
accelerometer = AccelerometerData(x = 0.1, y = 0.2, z = 0.3),
gps = GpsData(latitude = 10.123, longitude = 20.456),
timestamp = "2023-07-21T12:34:56Z"),
]
res = json.loads(StoreApiAdapter(None).processed_agent_data_batch_to_json(processed_data_batch))
assert res[0].agent_data.user_id == 1
assert res[1].agent_data.user_id == 2
assert res[2].agent_data.user_id == 3
if __name__ == "__main__":
test_processed_agent_data_batch_to_json()

3
hub/test-entry.sh Executable file
View File

@@ -0,0 +1,3 @@
#!/bin/sh
PYTHONPATH=$PWD python3 app/adapters/store_api_adapter_test.py