Compare commits

..

1 Commits

Author SHA1 Message Date
16b8fe9d56 [P] Add general smoke test and Store incremental test
Some checks failed
Component testing / Hub testing (push) Successful in 23s
Component testing / Store testing (push) Successful in 20s
Component testing / Integration smoke testing (push) Failing after 1m23s
2026-03-23 17:47:42 +02:00
19 changed files with 103 additions and 136 deletions

View File

@@ -1,16 +0,0 @@
name: Reset docker state
on: workflow_dispatch
jobs:
reset:
runs-on: host-arch-x86_64
name: Reset docker state
steps:
- name: Stop all containers
run: docker stop $(docker ps -a | cut -d " " -f 1 | tail -n +2)
- name: Remove all containers
run: docker rm $(docker ps -a | cut -d " " -f 1 | tail -n +2)
- name: Remove extra volumes
run: docker volume rm road_vision_postgres_data road_vision_pgadmin-data

View File

@@ -50,9 +50,9 @@ jobs:
- name: Clone repository
run: git clone --revision ${{ gitea.sha }} --depth 1 ${{ gitea.server_url }}/${{ gitea.repository }}
- name: Build all production containers
working-directory: IoT-Systems
run: docker-compose build
#- name: Build all production containers
#working-directory: IoT-Systems
#run: docker-compose build
- name: Start all production containers
working-directory: IoT-Systems

View File

@@ -72,8 +72,8 @@ class Datasource:
)
new_points = [
(
processed_agent_data.longitude,
processed_agent_data.latitude,
processed_agent_data.longitude,
processed_agent_data.road_state,
processed_agent_data.user_id
)

View File

@@ -2,6 +2,6 @@ from dataclasses import dataclass
@dataclass
class Accelerometer:
x: float
y: float
z: float
x: int
y: int
z: int

View File

@@ -32,7 +32,7 @@ class MapViewApp(App):
Встановлює необхідні маркери, викликає функцію для оновлення мапи
"""
self.update()
Clock.schedule_interval(self.update, 0.1)
Clock.schedule_interval(self.update, 5)
def update(self, *args):
"""
@@ -87,7 +87,6 @@ class MapViewApp(App):
self.car_markers[user_id].lat = lat
self.car_markers[user_id].lon = lon
if user_id == 1:
self.mapview.center_on(lat, lon)
def set_pothole_marker(self, point):

View File

@@ -8,7 +8,7 @@ def try_parse(type, value: str):
return None
USER_ID = try_parse(int, os.environ.get("USER_ID")) or 1
USER_ID = 1
# MQTT config
MQTT_BROKER_HOST = os.environ.get("MQTT_BROKER_HOST") or "mqtt"
MQTT_BROKER_PORT = try_parse(int, os.environ.get("MQTT_BROKER_PORT")) or 1883

View File

@@ -3,6 +3,6 @@ from dataclasses import dataclass
@dataclass
class Accelerometer:
x: float
y: float
z: float
x: int
y: int
z: int

View File

@@ -15,7 +15,6 @@ class FileDatasource:
def __init__(
self,
acc_divisor: float,
accelerometer_filename: str,
gps_filename: str,
park_filename: str,
@@ -35,8 +34,6 @@ class FileDatasource:
self._started = False
self.acc_divisor = acc_divisor
def startReading(self, *args, **kwargs):
"""Must be called before read()"""
if self._started:
@@ -163,14 +160,15 @@ class FileDatasource:
return row
def _parse_acc(self, row: List[str]) -> Accelerometer:
@staticmethod
def _parse_acc(row: List[str]) -> Accelerometer:
if len(row) < 3:
raise ValueError(f"Accelerometer row must have 3 values (x,y,z). Got: {row}")
try:
x = int(row[0]) / self.acc_divisor
y = int(row[1]) / self.acc_divisor
z = int(row[2]) / self.acc_divisor
x = int(row[0])
y = int(row[1])
z = int(row[2])
except ValueError as e:
raise ValueError(f"Invalid accelerometer values (expected integers): {row}") from e

View File

@@ -1,7 +1,6 @@
from paho.mqtt import client as mqtt_client
from schema.aggregated_data_schema import AggregatedDataSchema
from file_datasource import FileDatasource
import logging
import config
@@ -16,8 +15,6 @@ def connect_mqtt(broker, port):
print("Failed to connect {broker}:{port}, return code %d\n", rc)
exit(rc) # Stop execution
logging.info(f"Acting as USER_ID = {config.USER_ID}")
client = mqtt_client.Client()
client.on_connect = on_connect
client.connect(broker, port)
@@ -31,18 +28,16 @@ def publish(client, topic, datasource):
data = datasource.read()
msg = AggregatedDataSchema().dumps(data)
result = client.publish(topic, msg)
logging.debug(f"Published to {topic}: {msg[:50]}...")
status = result[0]
if status != 0:
logging.error(f"Failed to send message to topic {topic}")
print(f"Failed to send message to topic {topic}")
def run():
logging.basicConfig(level = logging.INFO)
# Prepare mqtt client
client = connect_mqtt(config.MQTT_BROKER_HOST, config.MQTT_BROKER_PORT)
# Prepare datasource
datasource = FileDatasource(16384.0, "data/accelerometer.csv", "data/gps.csv", "data/parking.csv")
datasource = FileDatasource("data/accelerometer.csv", "data/gps.csv", "data/parking.csv")
# Infinity publish data
publish(client, config.MQTT_TOPIC, datasource)

View File

@@ -2,6 +2,6 @@ from marshmallow import Schema, fields
class AccelerometerSchema(Schema):
x = fields.Float()
y = fields.Float()
z = fields.Float()
x = fields.Int()
y = fields.Int()
z = fields.Int()

View File

@@ -22,7 +22,6 @@ services:
depends_on:
- mqtt
environment:
PYTHONUNBUFFERED: 1
MQTT_BROKER_HOST: "mqtt"
MQTT_BROKER_PORT: 1883
MQTT_TOPIC: "agent_data_topic"
@@ -40,10 +39,9 @@ services:
environment:
MQTT_BROKER_HOST: "mqtt"
MQTT_BROKER_PORT: 1883
MQTT_TOPIC: "agent_data_topic"
HUB_HOST: "hub"
MQTT_TOPIC: " "
HUB_HOST: "store"
HUB_PORT: 8000
HUB_CONNECTION_TYPE: "http"
HUB_MQTT_BROKER_HOST: "mqtt"
HUB_MQTT_BROKER_PORT: 1883
HUB_MQTT_TOPIC: "processed_data_topic"
@@ -91,7 +89,6 @@ services:
- postgres_db
restart: always
environment:
PYTHONUNBUFFERED: 1
POSTGRES_USER: user
POSTGRES_PASSWORD: pass
POSTGRES_DB: test_db
@@ -123,7 +120,6 @@ services:
- redis
- store
environment:
PYTHONUNBUFFERED: 1
STORE_API_HOST: "store"
STORE_API_PORT: 8000
REDIS_HOST: "redis"

View File

@@ -13,7 +13,9 @@ class AgentMQTTAdapter(AgentGateway):
broker_port,
topic,
hub_gateway: HubGateway,
batch_size=10,
):
self.batch_size = batch_size
# MQTT
self.broker_host = broker_host
self.broker_port = broker_port
@@ -33,21 +35,42 @@ class AgentMQTTAdapter(AgentGateway):
"""Processing agent data and sent it to hub gateway"""
try:
payload: str = msg.payload.decode("utf-8")
# Create AgentData instance with the received data
agent_data = AgentData.model_validate_json(payload, strict=True)
# Process the received data (you can call a use case here if needed)
processed_data = process_agent_data(agent_data)
if self.hub_gateway.save_data(processed_data):
logging.info("Processed data successfully forwarded to the Hub.")
else:
logging.error("Failed to send data: Hub gateway is unavailable.")
# Store the agent_data in the database (you can send it to the data processing module)
if not self.hub_gateway.save_data(processed_data):
logging.error("Hub is not available")
except Exception as e:
logging.error(f"Error processing MQTT message: {e}")
logging.info(f"Error processing MQTT message: {e}")
def connect(self):
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
self.client.connect(self.broker_host, self.broker_port, 60)
def loop_forever(self):
self.client.loop_forever()
def start(self):
self.client.loop_start()
def stop(self):
self.client.loop_stop()
# Usage example:
if __name__ == "__main__":
broker_host = "localhost"
broker_port = 1883
topic = "agent_data_topic"
# Assuming you have implemented the StoreGateway and passed it to the adapter
store_gateway = HubGateway()
adapter = AgentMQTTAdapter(broker_host, broker_port, topic, store_gateway)
adapter.connect()
adapter.start()
try:
# Keep the adapter running in the background
while True:
pass
except KeyboardInterrupt:
adapter.stop()
logging.info("Adapter stopped.")

View File

@@ -14,7 +14,6 @@ class GpsData(BaseModel):
class AgentData(BaseModel):
user_id: int
accelerometer: AccelerometerData
gps: GpsData
timestamp: datetime

View File

@@ -26,8 +26,15 @@ class AgentGateway(ABC):
pass
@abstractmethod
def loop_forever(self):
def start(self):
"""
Method to await for new messages.
Method to start listening for messages from the agent.
"""
pass
@abstractmethod
def stop(self):
"""
Method to stop the agent gateway and clean up resources.
"""
pass

View File

@@ -1,7 +1,6 @@
from app.entities.agent_data import AgentData
from app.entities.processed_agent_data import ProcessedAgentData
_last_detection_state = {}
def process_agent_data(
agent_data: AgentData,
@@ -13,24 +12,4 @@ def process_agent_data(
Returns:
processed_data_batch (ProcessedAgentData): Processed data containing the classified state of the road surface and agent data.
"""
user_id = agent_data.user_id
road_state = "normal"
last_detection_state = _last_detection_state.get(user_id, False)
if (agent_data.accelerometer.z < 0.6):
road_state = "pothole"
elif (agent_data.accelerometer.z > 1.2):
road_state = "bump"
detection_happened = road_state != "normal"
if not (not last_detection_state and detection_happened):
road_state = "normal"
_last_detection_state[user_id] = detection_happened
return ProcessedAgentData(
road_state=road_state,
agent_data=agent_data
)
# Implement it

View File

@@ -16,12 +16,9 @@ MQTT_TOPIC = os.environ.get("MQTT_TOPIC") or "agent_data_topic"
# Configuration for hub MQTT
HUB_MQTT_BROKER_HOST = os.environ.get("HUB_MQTT_BROKER_HOST") or "localhost"
HUB_MQTT_BROKER_PORT = try_parse_int(os.environ.get("HUB_MQTT_BROKER_PORT")) or 1883
HUB_MQTT_TOPIC = os.environ.get("HUB_MQTT_TOPIC") or "processed_data_topic"
HUB_MQTT_TOPIC = os.environ.get("HUB_MQTT_TOPIC") or "processed_agent_data_topic"
# Configuration for the Hub
HUB_HOST = os.environ.get("HUB_HOST") or "localhost"
HUB_PORT = try_parse_int(os.environ.get("HUB_PORT")) or 8000
HUB_PORT = try_parse_int(os.environ.get("HUB_PORT")) or 12000
HUB_URL = f"http://{HUB_HOST}:{HUB_PORT}"
# For choosing type of connection
HUB_CONNECTION_TYPE = os.environ.get("HUB_CONNECTION_TYPE") or "mqtt"

View File

@@ -10,51 +10,42 @@ from config import (
HUB_MQTT_BROKER_HOST,
HUB_MQTT_BROKER_PORT,
HUB_MQTT_TOPIC,
HUB_CONNECTION_TYPE,
)
if __name__ == "__main__":
# Configure logging settings
logging.basicConfig(
level=logging.INFO,
level=logging.INFO, # Set the log level to INFO (you can use logging.DEBUG for more detailed logs)
format="[%(asctime)s] [%(levelname)s] [%(module)s] %(message)s",
handlers=[
logging.StreamHandler(),
logging.FileHandler("app.log"),
logging.StreamHandler(), # Output log messages to the console
logging.FileHandler("app.log"), # Save log messages to a file
],
)
# Logic to select the adapter based on configuration (SCRUM-93 & SCRUM-94)
# This allows easy switching between HTTP and MQTT protocols
if HUB_CONNECTION_TYPE.lower() == "http":
logging.info("Initializing HubHttpAdapter (SCRUM-93 integration)")
hub_adapter = HubHttpAdapter(
api_base_url=HUB_URL,
)
else:
logging.info("Initializing HubMqttAdapter (SCRUM-94 integration)")
# Create an instance of the StoreApiAdapter using the configuration
# hub_adapter = HubHttpAdapter(
# api_base_url=HUB_URL,
# )
hub_adapter = HubMqttAdapter(
broker=HUB_MQTT_BROKER_HOST,
port=HUB_MQTT_BROKER_PORT,
topic=HUB_MQTT_TOPIC,
)
# Create an instance of the AgentMQTTAdapter using the selected hub adapter
# This adapter acts as a bridge between the Agent and the Hub
# Create an instance of the AgentMQTTAdapter using the configuration
agent_adapter = AgentMQTTAdapter(
broker_host=MQTT_BROKER_HOST,
broker_port=MQTT_BROKER_PORT,
topic=MQTT_TOPIC,
hub_gateway=hub_adapter,
)
try:
logging.info(f"Connecting to MQTT broker at {MQTT_BROKER_HOST}:{MQTT_BROKER_PORT}")
# Connect to the MQTT broker and start listening for messages
agent_adapter.connect()
logging.info("Broker connection success. Waiting for data...")
agent_adapter.loop_forever()
agent_adapter.start()
# Keep the system running indefinitely (you can add other logic as needed)
while True:
pass
except KeyboardInterrupt:
logging.info("Interrupt signal received. Shutting down...")
agent_adapter.disconnect()
logging.info("Disconnected from MQTT broker.")
# Stop the MQTT adapter and exit gracefully if interrupted by the user
agent_adapter.stop()
logging.info("System stopped.")

View File

@@ -33,7 +33,7 @@ processed_agent_data = Table(
)
# WebSocket subscriptions
subscriptions: Set[WebSocket] = set()
subscriptions: Dict[int, Set[WebSocket]] = {}
# FastAPI WebSocket endpoint
@@ -41,7 +41,10 @@ subscriptions: Set[WebSocket] = set()
async def websocket_endpoint(websocket: WebSocket, user_id: int):
await websocket.accept()
subscriptions.add(websocket)
if user_id not in subscriptions:
subscriptions[user_id] = set()
subscriptions[user_id].add(websocket)
try:
# send already available data
@@ -52,20 +55,20 @@ async def websocket_endpoint(websocket: WebSocket, user_id: int):
for i in jsonable_data:
i['timestamp'] = i['timestamp'].strftime("%Y-%m-%dT%H:%M:%SZ")
for i in jsonable_data:
await websocket.send_json(json.dumps([i]))
await websocket.send_json(json.dumps(jsonable_data))
# receive forever
while True:
await websocket.receive_text()
except WebSocketDisconnect:
subscriptions.remove(websocket)
subscriptions[user_id].remove(websocket)
# Function to send data to subscribed users
async def send_data_to_subscribers(data):
for websocket in subscriptions:
await websocket.send_json(json.dumps([data]))
async def send_data_to_subscribers(user_id: int, data):
if user_id in subscriptions:
for websocket in subscriptions[user_id]:
await websocket.send_json(json.dumps(data))
# FastAPI CRUDL endpoints
@@ -97,7 +100,7 @@ async def create_processed_agent_data(data: List[ProcessedAgentData], user_id: i
session.commit()
for record in created_records:
await send_data_to_subscribers(jsonable_encoder(record))
await send_data_to_subscribers(user_id, jsonable_encoder(record))
return created_records
except Exception as err:
session.rollback()

View File

@@ -1,5 +1,4 @@
import sys
import os
print("Checking for dead containers...")
@@ -15,9 +14,6 @@ for i in statuses:
if not i[status_index:].startswith("Up "):
service_name = i[name_index:]
print(f"Crash detected in {service_name}")
print(f"docker logs for the container:\n")
os.system(f"docker logs {i.split(' ')[0]}")
print()
exit_code = 1
sys.exit(exit_code)
sys.exit(1)