add: hub template

This commit is contained in:
Toolf 2024-02-12 18:18:38 +02:00
parent abd6bf0abe
commit 173a61d117
15 changed files with 518 additions and 0 deletions

2
hub/.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
venv
__pycache__

11
hub/Dockerfile Normal file
View File

@ -0,0 +1,11 @@
# Use the official Python image as the base image
FROM python:3.9-slim
# Set the working directory inside the container
WORKDIR /app
# Copy the requirements.txt file and install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy the entire application into the container
COPY . .
# Run the main.py script inside the container when it starts
CMD ["uvicorn", "main:app", "--host", "0.0.0.0"]

33
hub/README.md Normal file
View File

@ -0,0 +1,33 @@
# Hub
## Instructions for Starting the Project
To start the Hub, follow these steps:
1. Clone the repository to your local machine:
```bash
git clone https://github.com/Toolf/hub.git
cd hub
```
2. Create and activate a virtual environment (optional but recommended):
```bash
python -m venv venv
source venv/bin/activate # On Windows, use: venv\Scripts\activate
```
3. Install the project dependencies:
```bash
pip install -r requirements.txt
```
4. Run the system:
```bash
python ./app/main.py
```
The system will start collecting data from the agent through MQTT and processing it.
## Running Tests
To run tests for the project, use the following command:
```bash
python -m unittest discover tests
```
## Common Commands
### 1. Saving Requirements
To save the project dependencies to the requirements.txt file:
```bash
pip freeze > requirements.txt
```

View File

@ -0,0 +1,24 @@
import json
import logging
from typing import List
import pydantic_core
import requests
from app.entities.processed_agent_data import ProcessedAgentData
from app.interfaces.store_gateway import StoreGateway
class StoreApiAdapter(StoreGateway):
def __init__(self, api_base_url):
self.api_base_url = api_base_url
def save_data(self, processed_agent_data_batch: List[ProcessedAgentData]):
"""
Save the processed road data to the Store API.
Parameters:
processed_agent_data_batch (dict): Processed road data to be saved.
Returns:
bool: True if the data is successfully saved, False otherwise.
"""
# Implement it

View File

@ -0,0 +1,33 @@
from datetime import datetime
from pydantic import BaseModel, field_validator
class AccelerometerData(BaseModel):
x: float
y: float
z: float
class GpsData(BaseModel):
latitude: float
longitude: float
class AgentData(BaseModel):
user_id: int
accelerometer: AccelerometerData
gps: GpsData
timestamp: datetime
@classmethod
@field_validator('timestamp', mode='before')
def parse_timestamp(cls, value):
# Convert the timestamp to a datetime object
if isinstance(value, datetime):
return value
try:
return datetime.fromisoformat(value)
except (TypeError, ValueError):
raise ValueError(
"Invalid timestamp format. Expected ISO 8601 format (YYYY-MM-DDTHH:MM:SSZ)."
)

View File

@ -0,0 +1,7 @@
from pydantic import BaseModel
from app.entities.agent_data import AgentData
class ProcessedAgentData(BaseModel):
road_state: str
agent_data: AgentData

View File

@ -0,0 +1,21 @@
from abc import ABC, abstractmethod
from typing import List
from app.entities.processed_agent_data import ProcessedAgentData
class StoreGateway(ABC):
"""
Abstract class representing the Store Gateway interface.
All store gateway adapters must implement these methods.
"""
@abstractmethod
def save_data(self, processed_agent_data_batch: List[ProcessedAgentData]) -> bool:
"""
Method to save the processed agent data in the database.
Parameters:
processed_agent_data_batch (ProcessedAgentData): The processed agent data to be saved.
Returns:
bool: True if the data is successfully saved, False otherwise.
"""
pass

26
hub/config.py Normal file
View File

@ -0,0 +1,26 @@
import os
def try_parse_int(value: str):
try:
return int(value)
except Exception:
return None
# Configuration for the Store API
STORE_API_HOST = os.environ.get("STORE_API_HOST") or "localhost"
STORE_API_PORT = try_parse_int(os.environ.get("STORE_API_PORT")) or 8000
STORE_API_BASE_URL = f"http://{STORE_API_HOST}:{STORE_API_PORT}"
# Configure for Redis
REDIS_HOST = os.environ.get("REDIS_HOST") or "localhost"
REDIS_PORT = try_parse_int(os.environ.get("REDIS_PORT")) or 6379
# Configure for hub logic
BATCH_SIZE = try_parse_int(os.environ.get("BATCH_SIZE")) or 20
# MQTT
MQTT_BROKER_HOST = os.environ.get("MQTT_BROKER_HOST") or "localhost"
MQTT_BROKER_PORT = try_parse_int(os.environ.get("MQTT_BROKER_PORT")) or 1883
MQTT_TOPIC = os.environ.get("MQTT_TOPIC") or "processed_agent_data_topic"

View File

@ -0,0 +1,11 @@
CREATE TABLE processed_agent_data (
id SERIAL PRIMARY KEY,
road_state VARCHAR(255) NOT NULL,
user_id INTEGER NOT NULL,
x FLOAT,
y FLOAT,
z FLOAT,
latitude FLOAT,
longitude FLOAT,
timestamp TIMESTAMP
);

View File

@ -0,0 +1,111 @@
version: "3.9"
name: "road_vision__hub"
services:
mqtt:
image: eclipse-mosquitto
container_name: mqtt
volumes:
- ./mosquitto:/mosquitto
- ./mosquitto/data:/mosquitto/data
- ./mosquitto/log:/mosquitto/log
ports:
- 1883:1883
- 9001:9001
networks:
mqtt_network:
postgres_db:
image: postgres:latest
container_name: postgres_db
restart: always
environment:
POSTGRES_USER: user
POSTGRES_PASSWORD: pass
POSTGRES_DB: test_db
volumes:
- postgres_data:/var/lib/postgresql/data
- ./db/structure.sql:/docker-entrypoint-initdb.d/structure.sql
ports:
- "5432:5432"
networks:
db_network:
pgadmin:
container_name: pgadmin4
image: dpage/pgadmin4
restart: always
environment:
PGADMIN_DEFAULT_EMAIL: admin@admin.com
PGADMIN_DEFAULT_PASSWORD: root
volumes:
- pgadmin-data:/var/lib/pgadmin
ports:
- "5050:80"
networks:
db_network:
store:
container_name: store
build: ../../store
depends_on:
- postgres_db
restart: always
environment:
POSTGRES_USER: user
POSTGRES_PASSWORD: pass
POSTGRES_DB: test_db
POSTGRES_HOST: postgres_db
POSTGRES_PORT: 5432
ports:
- "8000:8000"
networks:
db_network:
hub_store:
redis:
image: redis:latest
container_name: redis
ports:
- "6379:6379"
networks:
hub_redis:
hub:
container_name: hub
build: ../
depends_on:
- mqtt
- redis
- store
environment:
STORE_API_HOST: "store"
STORE_API_PORT: 8000
REDIS_HOST: "redis"
REDIS_PORT: 6379
MQTT_BROKER_HOST: "mqtt"
MQTT_BROKER_PORT: 1883
MQTT_TOPIC: "processed_data_topic"
BATCH_SIZE: 1
ports:
- "9000:8000"
networks:
mqtt_network:
hub_store:
hub_redis:
networks:
mqtt_network:
db_network:
hub_store:
hub_redis:
volumes:
postgres_data:
pgadmin-data:

View File

@ -0,0 +1,11 @@
persistence true
persistence_location /mosquitto/data/
listener 1883
## Authentication ##
allow_anonymous true
# allow_anonymous false
# password_file /mosquitto/config/password.txt
## Log ##
log_dest file /mosquitto/log/mosquitto.log
log_dest stdout
# listener 1883

96
hub/main.py Normal file
View File

@ -0,0 +1,96 @@
import logging
from typing import List
from fastapi import FastAPI
from redis import Redis
import paho.mqtt.client as mqtt
from app.adapters.store_api_adapter import StoreApiAdapter
from app.entities.processed_agent_data import ProcessedAgentData
from config import (
STORE_API_BASE_URL,
REDIS_HOST,
REDIS_PORT,
BATCH_SIZE,
MQTT_TOPIC,
MQTT_BROKER_HOST,
MQTT_BROKER_PORT,
)
# Configure logging settings
logging.basicConfig(
level=logging.INFO, # Set the log level to INFO (you can use logging.DEBUG for more detailed logs)
format="[%(asctime)s] [%(levelname)s] [%(module)s] %(message)s",
handlers=[
logging.StreamHandler(), # Output log messages to the console
logging.FileHandler("app.log"), # Save log messages to a file
],
)
# Create an instance of the Redis using the configuration
redis_client = Redis(host=REDIS_HOST, port=REDIS_PORT)
# Create an instance of the StoreApiAdapter using the configuration
store_adapter = StoreApiAdapter(api_base_url=STORE_API_BASE_URL)
# Create an instance of the AgentMQTTAdapter using the configuration
# FastAPI
app = FastAPI()
@app.post("/processed_agent_data/")
async def save_processed_agent_data(processed_agent_data: ProcessedAgentData):
redis_client.lpush("processed_agent_data", processed_agent_data.model_dump_json())
if redis_client.llen("processed_agent_data") >= BATCH_SIZE:
processed_agent_data_batch: List[ProcessedAgentData] = []
for _ in range(BATCH_SIZE):
processed_agent_data = ProcessedAgentData.model_validate_json(
redis_client.lpop("processed_agent_data")
)
processed_agent_data_batch.append(processed_agent_data)
print(processed_agent_data_batch)
store_adapter.save_data(processed_agent_data_batch=processed_agent_data_batch)
return {"status": "ok"}
# MQTT
client = mqtt.Client()
def on_connect(client, userdata, flags, rc):
if rc == 0:
logging.info("Connected to MQTT broker")
client.subscribe(MQTT_TOPIC)
else:
logging.info(f"Failed to connect to MQTT broker with code: {rc}")
def on_message(client, userdata, msg):
try:
payload: str = msg.payload.decode("utf-8")
# Create ProcessedAgentData instance with the received data
processed_agent_data = ProcessedAgentData.model_validate_json(
payload, strict=True
)
redis_client.lpush(
"processed_agent_data", processed_agent_data.model_dump_json()
)
processed_agent_data_batch: List[ProcessedAgentData] = []
if redis_client.llen("processed_agent_data") >= BATCH_SIZE:
for _ in range(BATCH_SIZE):
processed_agent_data = ProcessedAgentData.model_validate_json(
redis_client.lpop("processed_agent_data")
)
processed_agent_data_batch.append(processed_agent_data)
store_adapter.save_data(processed_agent_data_batch=processed_agent_data_batch)
return {"status": "ok"}
except Exception as e:
logging.info(f"Error processing MQTT message: {e}")
# Connect
client.on_connect = on_connect
client.on_message = on_message
client.connect(MQTT_BROKER_HOST, MQTT_BROKER_PORT)
# Start
client.loop_start()

BIN
hub/requirements.txt Normal file

Binary file not shown.

View File

@ -0,0 +1,60 @@
import unittest
from unittest.mock import Mock
import redis
from app.adapters.agent_mqtt_adapter import AgentMQTTAdapter
from app.interfaces.store_gateway import StoreGateway
from app.entities.agent_data import AccelerometerData, AgentData, GpsData
from app.usecases.data_processing import process_agent_data_batch
class TestAgentMQTTAdapter(unittest.TestCase):
def setUp(self):
# Create a mock StoreGateway for testing
self.mock_store_gateway = Mock(spec=StoreGateway)
self.mock_redis = Mock(spec=redis.Redis)
# Create the AgentMQTTAdapter instance with the mock StoreGateway
self.agent_adapter = AgentMQTTAdapter(
broker_host="test_broker",
broker_port=1234,
topic="test_topic",
store_gateway=self.mock_store_gateway,
redis_client=self.mock_redis,
batch_size=1,
)
def test_on_message_valid_data(self):
# Test handling of valid incoming MQTT message
# (Assuming data is in the correct JSON format)
valid_json_data = '{"user_id": 1,"accelerometer": {"x": 0.1, "y": 0.2, "z": 0.3}, "gps": {"latitude": 10.123, "longitude": 20.456}, "timestamp": "2023-07-21T12:34:56Z"}'
mock_msg = Mock(payload=valid_json_data.encode("utf-8"))
self.mock_redis.llen.return_value = 1
self.mock_redis.rpop.return_value = valid_json_data
# Call on_message with the mock message
self.agent_adapter.on_message(None, None, mock_msg)
# Ensure that the store_gateway's save_data method is called once with the correct arguments
expected_agent_data = AgentData(
user_id=1,
accelerometer=AccelerometerData(
x=0.1,
y=0.2,
z=0.3,
),
gps=GpsData(
latitude=10.123,
longitude=20.456,
),
timestamp="2023-07-21T12:34:56Z",
)
self.mock_store_gateway.save_data.assert_called_once_with(
process_agent_data_batch([expected_agent_data])
)
def test_on_message_invalid_data(self):
# Test handling of invalid incoming MQTT message
# (Assuming data is missing required fields or has incorrect format)
invalid_json_data = '{"user_id": 1, "accelerometer": {"x": 0.1, "y": 0.2}, "gps": {"latitude": 10.123}, "timestamp": 12345}'
mock_msg = Mock(payload=invalid_json_data.encode("utf-8"))
# Call on_message with the mock message
self.agent_adapter.on_message(None, None, mock_msg)
# Ensure that the store_gateway's save_data method is not called (due to invalid data)
self.mock_store_gateway.save_data.assert_not_called()
if __name__ == "__main__":
unittest.main()

View File

@ -0,0 +1,72 @@
import requests
import unittest
from unittest.mock import Mock, patch
from app.adapters.store_api_adapter import StoreApiAdapter
from app.entities.agent_data import AccelerometerData, AgentData, GpsData
from app.entities.processed_agent_data import ProcessedAgentData
class TestStoreApiAdapter(unittest.TestCase):
def setUp(self):
# Create the StoreApiAdapter instance
self.store_api_adapter = StoreApiAdapter(api_base_url="http://test-api.com")
@patch.object(requests, "post")
def test_save_data_success(self, mock_post):
# Test successful saving of data to the Store API
# Sample processed road data
agent_data = AgentData(
user_id=1,
accelerometer=AccelerometerData(
x=0.1,
y=0.2,
z=0.3,
),
gps=GpsData(
latitude=10.123,
longitude=20.456,
),
timestamp="2023-07-21T12:34:56Z",
)
processed_data = ProcessedAgentData(road_state="normal", agent_data=agent_data)
# Mock the response from the Store API
mock_response = Mock(status_code=201) # 201 indicates successful creation
mock_post.return_value = mock_response
# Call the save_data method
result = self.store_api_adapter.save_data(processed_data)
# Ensure that the post method of the mock is called with the correct arguments
mock_post.assert_called_once_with(
"http://test-api.com/agent_data", json=processed_data.model_dump()
)
# Ensure that the result is True, indicating successful saving
self.assertTrue(result)
@patch.object(requests, "post")
def test_save_data_failure(self, mock_post):
# Test failure to save data to the Store API
# Sample processed road data
agent_data = AgentData(
user_id=1,
accelerometer=AccelerometerData(
x=0.1,
y=0.2,
z=0.3,
),
gps=GpsData(
latitude=10.123,
longitude=20.456,
),
timestamp="2023-07-21T12:34:56Z",
)
processed_data = ProcessedAgentData(road_state="normal", agent_data=agent_data)
# Mock the response from the Store API
mock_response = Mock(status_code=400) # 400 indicates a client error
mock_post.return_value = mock_response
# Call the save_data method
result = self.store_api_adapter.save_data(processed_data)
# Ensure that the post method of the mock is called with the correct arguments
mock_post.assert_called_once_with(
"http://test-api.com/agent_data", json=processed_data.model_dump()
)
# Ensure that the result is False, indicating failure to save
self.assertFalse(result)
if __name__ == "__main__":
unittest.main()