diff --git a/hub/.gitignore b/hub/.gitignore new file mode 100644 index 0000000..d75edea --- /dev/null +++ b/hub/.gitignore @@ -0,0 +1,2 @@ +venv +__pycache__ \ No newline at end of file diff --git a/hub/Dockerfile b/hub/Dockerfile new file mode 100644 index 0000000..ffd3cbb --- /dev/null +++ b/hub/Dockerfile @@ -0,0 +1,11 @@ +# Use the official Python image as the base image +FROM python:3.9-slim +# Set the working directory inside the container +WORKDIR /app +# Copy the requirements.txt file and install dependencies +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt +# Copy the entire application into the container +COPY . . +# Run the main.py script inside the container when it starts +CMD ["uvicorn", "main:app", "--host", "0.0.0.0"] \ No newline at end of file diff --git a/hub/README.md b/hub/README.md new file mode 100644 index 0000000..6479b6a --- /dev/null +++ b/hub/README.md @@ -0,0 +1,33 @@ +# Hub +## Instructions for Starting the Project +To start the Hub, follow these steps: +1. Clone the repository to your local machine: +```bash +git clone https://github.com/Toolf/hub.git +cd hub +``` +2. Create and activate a virtual environment (optional but recommended): +```bash +python -m venv venv +source venv/bin/activate # On Windows, use: venv\Scripts\activate +``` +3. Install the project dependencies: +```bash +pip install -r requirements.txt +``` +4. Run the system: +```bash +python ./app/main.py +``` +The system will start collecting data from the agent through MQTT and processing it. +## Running Tests +To run tests for the project, use the following command: +```bash +python -m unittest discover tests +``` +## Common Commands +### 1. Saving Requirements +To save the project dependencies to the requirements.txt file: +```bash +pip freeze > requirements.txt +``` \ No newline at end of file diff --git a/hub/app/adapters/store_api_adapter.py b/hub/app/adapters/store_api_adapter.py new file mode 100644 index 0000000..cd76be4 --- /dev/null +++ b/hub/app/adapters/store_api_adapter.py @@ -0,0 +1,24 @@ +import json +import logging +from typing import List + +import pydantic_core +import requests + +from app.entities.processed_agent_data import ProcessedAgentData +from app.interfaces.store_gateway import StoreGateway + + +class StoreApiAdapter(StoreGateway): + def __init__(self, api_base_url): + self.api_base_url = api_base_url + + def save_data(self, processed_agent_data_batch: List[ProcessedAgentData]): + """ + Save the processed road data to the Store API. + Parameters: + processed_agent_data_batch (dict): Processed road data to be saved. + Returns: + bool: True if the data is successfully saved, False otherwise. + """ + # Implement it diff --git a/hub/app/entities/agent_data.py b/hub/app/entities/agent_data.py new file mode 100644 index 0000000..c2316ae --- /dev/null +++ b/hub/app/entities/agent_data.py @@ -0,0 +1,33 @@ +from datetime import datetime +from pydantic import BaseModel, field_validator + + +class AccelerometerData(BaseModel): + x: float + y: float + z: float + + +class GpsData(BaseModel): + latitude: float + longitude: float + + +class AgentData(BaseModel): + user_id: int + accelerometer: AccelerometerData + gps: GpsData + timestamp: datetime + + @classmethod + @field_validator('timestamp', mode='before') + def parse_timestamp(cls, value): + # Convert the timestamp to a datetime object + if isinstance(value, datetime): + return value + try: + return datetime.fromisoformat(value) + except (TypeError, ValueError): + raise ValueError( + "Invalid timestamp format. Expected ISO 8601 format (YYYY-MM-DDTHH:MM:SSZ)." + ) diff --git a/hub/app/entities/processed_agent_data.py b/hub/app/entities/processed_agent_data.py new file mode 100644 index 0000000..95accd4 --- /dev/null +++ b/hub/app/entities/processed_agent_data.py @@ -0,0 +1,7 @@ +from pydantic import BaseModel +from app.entities.agent_data import AgentData + + +class ProcessedAgentData(BaseModel): + road_state: str + agent_data: AgentData diff --git a/hub/app/interfaces/store_gateway.py b/hub/app/interfaces/store_gateway.py new file mode 100644 index 0000000..b94cd2c --- /dev/null +++ b/hub/app/interfaces/store_gateway.py @@ -0,0 +1,21 @@ +from abc import ABC, abstractmethod +from typing import List +from app.entities.processed_agent_data import ProcessedAgentData + + +class StoreGateway(ABC): + """ + Abstract class representing the Store Gateway interface. + All store gateway adapters must implement these methods. + """ + + @abstractmethod + def save_data(self, processed_agent_data_batch: List[ProcessedAgentData]) -> bool: + """ + Method to save the processed agent data in the database. + Parameters: + processed_agent_data_batch (ProcessedAgentData): The processed agent data to be saved. + Returns: + bool: True if the data is successfully saved, False otherwise. + """ + pass diff --git a/hub/config.py b/hub/config.py new file mode 100644 index 0000000..53c339b --- /dev/null +++ b/hub/config.py @@ -0,0 +1,26 @@ +import os + + +def try_parse_int(value: str): + try: + return int(value) + except Exception: + return None + + +# Configuration for the Store API +STORE_API_HOST = os.environ.get("STORE_API_HOST") or "localhost" +STORE_API_PORT = try_parse_int(os.environ.get("STORE_API_PORT")) or 8000 +STORE_API_BASE_URL = f"http://{STORE_API_HOST}:{STORE_API_PORT}" + +# Configure for Redis +REDIS_HOST = os.environ.get("REDIS_HOST") or "localhost" +REDIS_PORT = try_parse_int(os.environ.get("REDIS_PORT")) or 6379 + +# Configure for hub logic +BATCH_SIZE = try_parse_int(os.environ.get("BATCH_SIZE")) or 20 + +# MQTT +MQTT_BROKER_HOST = os.environ.get("MQTT_BROKER_HOST") or "localhost" +MQTT_BROKER_PORT = try_parse_int(os.environ.get("MQTT_BROKER_PORT")) or 1883 +MQTT_TOPIC = os.environ.get("MQTT_TOPIC") or "processed_agent_data_topic" diff --git a/hub/docker/db/structure.sql b/hub/docker/db/structure.sql new file mode 100644 index 0000000..06b6f76 --- /dev/null +++ b/hub/docker/db/structure.sql @@ -0,0 +1,11 @@ +CREATE TABLE processed_agent_data ( + id SERIAL PRIMARY KEY, + road_state VARCHAR(255) NOT NULL, + user_id INTEGER NOT NULL, + x FLOAT, + y FLOAT, + z FLOAT, + latitude FLOAT, + longitude FLOAT, + timestamp TIMESTAMP +); \ No newline at end of file diff --git a/hub/docker/docker-compose.yaml b/hub/docker/docker-compose.yaml new file mode 100644 index 0000000..ce1072a --- /dev/null +++ b/hub/docker/docker-compose.yaml @@ -0,0 +1,111 @@ +version: "3.9" +name: "road_vision__hub" +services: + mqtt: + image: eclipse-mosquitto + container_name: mqtt + volumes: + - ./mosquitto:/mosquitto + - ./mosquitto/data:/mosquitto/data + - ./mosquitto/log:/mosquitto/log + ports: + - 1883:1883 + - 9001:9001 + networks: + mqtt_network: + + + postgres_db: + image: postgres:latest + container_name: postgres_db + restart: always + environment: + POSTGRES_USER: user + POSTGRES_PASSWORD: pass + POSTGRES_DB: test_db + volumes: + - postgres_data:/var/lib/postgresql/data + - ./db/structure.sql:/docker-entrypoint-initdb.d/structure.sql + ports: + - "5432:5432" + networks: + db_network: + + + pgadmin: + container_name: pgadmin4 + image: dpage/pgadmin4 + restart: always + environment: + PGADMIN_DEFAULT_EMAIL: admin@admin.com + PGADMIN_DEFAULT_PASSWORD: root + volumes: + - pgadmin-data:/var/lib/pgadmin + ports: + - "5050:80" + networks: + db_network: + + + store: + container_name: store + build: ../../store + depends_on: + - postgres_db + restart: always + environment: + POSTGRES_USER: user + POSTGRES_PASSWORD: pass + POSTGRES_DB: test_db + POSTGRES_HOST: postgres_db + POSTGRES_PORT: 5432 + ports: + - "8000:8000" + networks: + db_network: + hub_store: + + + redis: + image: redis:latest + container_name: redis + ports: + - "6379:6379" + networks: + hub_redis: + + + hub: + container_name: hub + build: ../ + depends_on: + - mqtt + - redis + - store + environment: + STORE_API_HOST: "store" + STORE_API_PORT: 8000 + REDIS_HOST: "redis" + REDIS_PORT: 6379 + MQTT_BROKER_HOST: "mqtt" + MQTT_BROKER_PORT: 1883 + MQTT_TOPIC: "processed_data_topic" + BATCH_SIZE: 1 + ports: + - "9000:8000" + networks: + mqtt_network: + hub_store: + hub_redis: + + +networks: + mqtt_network: + db_network: + hub_store: + hub_redis: + + +volumes: + postgres_data: + pgadmin-data: diff --git a/hub/docker/mosquitto/config/mosquitto.conf b/hub/docker/mosquitto/config/mosquitto.conf new file mode 100644 index 0000000..0394e8c --- /dev/null +++ b/hub/docker/mosquitto/config/mosquitto.conf @@ -0,0 +1,11 @@ +persistence true +persistence_location /mosquitto/data/ +listener 1883 +## Authentication ## +allow_anonymous true +# allow_anonymous false +# password_file /mosquitto/config/password.txt +## Log ## +log_dest file /mosquitto/log/mosquitto.log +log_dest stdout +# listener 1883 \ No newline at end of file diff --git a/hub/main.py b/hub/main.py new file mode 100644 index 0000000..1c2c0e9 --- /dev/null +++ b/hub/main.py @@ -0,0 +1,96 @@ +import logging +from typing import List + +from fastapi import FastAPI +from redis import Redis +import paho.mqtt.client as mqtt + +from app.adapters.store_api_adapter import StoreApiAdapter +from app.entities.processed_agent_data import ProcessedAgentData +from config import ( + STORE_API_BASE_URL, + REDIS_HOST, + REDIS_PORT, + BATCH_SIZE, + MQTT_TOPIC, + MQTT_BROKER_HOST, + MQTT_BROKER_PORT, +) + +# Configure logging settings +logging.basicConfig( + level=logging.INFO, # Set the log level to INFO (you can use logging.DEBUG for more detailed logs) + format="[%(asctime)s] [%(levelname)s] [%(module)s] %(message)s", + handlers=[ + logging.StreamHandler(), # Output log messages to the console + logging.FileHandler("app.log"), # Save log messages to a file + ], +) +# Create an instance of the Redis using the configuration +redis_client = Redis(host=REDIS_HOST, port=REDIS_PORT) +# Create an instance of the StoreApiAdapter using the configuration +store_adapter = StoreApiAdapter(api_base_url=STORE_API_BASE_URL) +# Create an instance of the AgentMQTTAdapter using the configuration + +# FastAPI +app = FastAPI() + + +@app.post("/processed_agent_data/") +async def save_processed_agent_data(processed_agent_data: ProcessedAgentData): + redis_client.lpush("processed_agent_data", processed_agent_data.model_dump_json()) + if redis_client.llen("processed_agent_data") >= BATCH_SIZE: + processed_agent_data_batch: List[ProcessedAgentData] = [] + for _ in range(BATCH_SIZE): + processed_agent_data = ProcessedAgentData.model_validate_json( + redis_client.lpop("processed_agent_data") + ) + processed_agent_data_batch.append(processed_agent_data) + print(processed_agent_data_batch) + store_adapter.save_data(processed_agent_data_batch=processed_agent_data_batch) + return {"status": "ok"} + + +# MQTT +client = mqtt.Client() + + +def on_connect(client, userdata, flags, rc): + if rc == 0: + logging.info("Connected to MQTT broker") + client.subscribe(MQTT_TOPIC) + else: + logging.info(f"Failed to connect to MQTT broker with code: {rc}") + + +def on_message(client, userdata, msg): + try: + payload: str = msg.payload.decode("utf-8") + # Create ProcessedAgentData instance with the received data + processed_agent_data = ProcessedAgentData.model_validate_json( + payload, strict=True + ) + + redis_client.lpush( + "processed_agent_data", processed_agent_data.model_dump_json() + ) + processed_agent_data_batch: List[ProcessedAgentData] = [] + if redis_client.llen("processed_agent_data") >= BATCH_SIZE: + for _ in range(BATCH_SIZE): + processed_agent_data = ProcessedAgentData.model_validate_json( + redis_client.lpop("processed_agent_data") + ) + processed_agent_data_batch.append(processed_agent_data) + store_adapter.save_data(processed_agent_data_batch=processed_agent_data_batch) + return {"status": "ok"} + except Exception as e: + logging.info(f"Error processing MQTT message: {e}") + + +# Connect +client.on_connect = on_connect +client.on_message = on_message +client.connect(MQTT_BROKER_HOST, MQTT_BROKER_PORT) + +# Start +client.loop_start() diff --git a/hub/requirements.txt b/hub/requirements.txt new file mode 100644 index 0000000..69931b6 Binary files /dev/null and b/hub/requirements.txt differ diff --git a/hub/tests/test_agent_mqtt_adapter.py b/hub/tests/test_agent_mqtt_adapter.py new file mode 100644 index 0000000..b7594ce --- /dev/null +++ b/hub/tests/test_agent_mqtt_adapter.py @@ -0,0 +1,60 @@ +import unittest +from unittest.mock import Mock +import redis +from app.adapters.agent_mqtt_adapter import AgentMQTTAdapter +from app.interfaces.store_gateway import StoreGateway +from app.entities.agent_data import AccelerometerData, AgentData, GpsData +from app.usecases.data_processing import process_agent_data_batch + +class TestAgentMQTTAdapter(unittest.TestCase): + def setUp(self): + # Create a mock StoreGateway for testing + self.mock_store_gateway = Mock(spec=StoreGateway) + self.mock_redis = Mock(spec=redis.Redis) + # Create the AgentMQTTAdapter instance with the mock StoreGateway + self.agent_adapter = AgentMQTTAdapter( + broker_host="test_broker", + broker_port=1234, + topic="test_topic", + store_gateway=self.mock_store_gateway, + redis_client=self.mock_redis, + batch_size=1, + ) + def test_on_message_valid_data(self): + # Test handling of valid incoming MQTT message + # (Assuming data is in the correct JSON format) + valid_json_data = '{"user_id": 1,"accelerometer": {"x": 0.1, "y": 0.2, "z": 0.3}, "gps": {"latitude": 10.123, "longitude": 20.456}, "timestamp": "2023-07-21T12:34:56Z"}' + mock_msg = Mock(payload=valid_json_data.encode("utf-8")) + self.mock_redis.llen.return_value = 1 + self.mock_redis.rpop.return_value = valid_json_data + # Call on_message with the mock message + self.agent_adapter.on_message(None, None, mock_msg) + # Ensure that the store_gateway's save_data method is called once with the correct arguments + expected_agent_data = AgentData( + user_id=1, + accelerometer=AccelerometerData( + x=0.1, + y=0.2, + z=0.3, + ), + gps=GpsData( + latitude=10.123, + longitude=20.456, + ), + timestamp="2023-07-21T12:34:56Z", + ) + self.mock_store_gateway.save_data.assert_called_once_with( + process_agent_data_batch([expected_agent_data]) + ) + def test_on_message_invalid_data(self): + # Test handling of invalid incoming MQTT message + # (Assuming data is missing required fields or has incorrect format) + invalid_json_data = '{"user_id": 1, "accelerometer": {"x": 0.1, "y": 0.2}, "gps": {"latitude": 10.123}, "timestamp": 12345}' + mock_msg = Mock(payload=invalid_json_data.encode("utf-8")) + # Call on_message with the mock message + self.agent_adapter.on_message(None, None, mock_msg) + # Ensure that the store_gateway's save_data method is not called (due to invalid data) + self.mock_store_gateway.save_data.assert_not_called() + +if __name__ == "__main__": + unittest.main() diff --git a/hub/tests/test_store_api_adapter.py b/hub/tests/test_store_api_adapter.py new file mode 100644 index 0000000..83741f5 --- /dev/null +++ b/hub/tests/test_store_api_adapter.py @@ -0,0 +1,72 @@ +import requests +import unittest +from unittest.mock import Mock, patch +from app.adapters.store_api_adapter import StoreApiAdapter +from app.entities.agent_data import AccelerometerData, AgentData, GpsData +from app.entities.processed_agent_data import ProcessedAgentData + +class TestStoreApiAdapter(unittest.TestCase): + def setUp(self): + # Create the StoreApiAdapter instance + self.store_api_adapter = StoreApiAdapter(api_base_url="http://test-api.com") + @patch.object(requests, "post") + def test_save_data_success(self, mock_post): + # Test successful saving of data to the Store API + # Sample processed road data + agent_data = AgentData( + user_id=1, + accelerometer=AccelerometerData( + x=0.1, + y=0.2, + z=0.3, + ), + gps=GpsData( + latitude=10.123, + longitude=20.456, + ), + timestamp="2023-07-21T12:34:56Z", + ) + processed_data = ProcessedAgentData(road_state="normal", agent_data=agent_data) + # Mock the response from the Store API + mock_response = Mock(status_code=201) # 201 indicates successful creation + mock_post.return_value = mock_response + # Call the save_data method + result = self.store_api_adapter.save_data(processed_data) + # Ensure that the post method of the mock is called with the correct arguments + mock_post.assert_called_once_with( + "http://test-api.com/agent_data", json=processed_data.model_dump() + ) + # Ensure that the result is True, indicating successful saving + self.assertTrue(result) + @patch.object(requests, "post") + def test_save_data_failure(self, mock_post): + # Test failure to save data to the Store API + # Sample processed road data + agent_data = AgentData( + user_id=1, + accelerometer=AccelerometerData( + x=0.1, + y=0.2, + z=0.3, + ), + gps=GpsData( + latitude=10.123, + longitude=20.456, + ), + timestamp="2023-07-21T12:34:56Z", + ) + processed_data = ProcessedAgentData(road_state="normal", agent_data=agent_data) + # Mock the response from the Store API + mock_response = Mock(status_code=400) # 400 indicates a client error + mock_post.return_value = mock_response + # Call the save_data method + result = self.store_api_adapter.save_data(processed_data) + # Ensure that the post method of the mock is called with the correct arguments + mock_post.assert_called_once_with( + "http://test-api.com/agent_data", json=processed_data.model_dump() + ) + # Ensure that the result is False, indicating failure to save + self.assertFalse(result) + +if __name__ == "__main__": + unittest.main()