Compare commits

..

1 Commits

Author SHA1 Message Date
cdc5c7981d [P] Add general smoke test and Store incremental test
Some checks failed
Component testing / Hub testing (push) Successful in 19s
Component testing / Store testing (push) Successful in 21s
Component testing / Integration smoke testing (push) Has been cancelled
2026-03-23 17:38:01 +02:00
19 changed files with 100 additions and 136 deletions

View File

@@ -1,16 +0,0 @@
name: Reset docker state
on: workflow_dispatch
jobs:
reset:
runs-on: host-arch-x86_64
name: Reset docker state
steps:
- name: Stop all containers
run: docker stop $(docker ps -a | cut -d " " -f 1 | tail -n +2)
- name: Remove all containers
run: docker rm $(docker ps -a | cut -d " " -f 1 | tail -n +2)
- name: Remove extra volumes
run: docker volume rm road_vision_postgres_data road_vision_pgadmin-data

View File

@@ -67,5 +67,4 @@ jobs:
- name: Clean up - name: Clean up
if: ${{always()}} if: ${{always()}}
working-directory: IoT-Systems
run: docker-compose down -v run: docker-compose down -v

View File

@@ -72,8 +72,8 @@ class Datasource:
) )
new_points = [ new_points = [
( (
processed_agent_data.longitude,
processed_agent_data.latitude, processed_agent_data.latitude,
processed_agent_data.longitude,
processed_agent_data.road_state, processed_agent_data.road_state,
processed_agent_data.user_id processed_agent_data.user_id
) )

View File

@@ -2,6 +2,6 @@ from dataclasses import dataclass
@dataclass @dataclass
class Accelerometer: class Accelerometer:
x: float x: int
y: float y: int
z: float z: int

View File

@@ -32,7 +32,7 @@ class MapViewApp(App):
Встановлює необхідні маркери, викликає функцію для оновлення мапи Встановлює необхідні маркери, викликає функцію для оновлення мапи
""" """
self.update() self.update()
Clock.schedule_interval(self.update, 0.1) Clock.schedule_interval(self.update, 5)
def update(self, *args): def update(self, *args):
""" """
@@ -87,8 +87,7 @@ class MapViewApp(App):
self.car_markers[user_id].lat = lat self.car_markers[user_id].lat = lat
self.car_markers[user_id].lon = lon self.car_markers[user_id].lon = lon
if user_id == 1: self.mapview.center_on(lat, lon)
self.mapview.center_on(lat, lon)
def set_pothole_marker(self, point): def set_pothole_marker(self, point):
if isinstance(point, dict): if isinstance(point, dict):

View File

@@ -8,7 +8,7 @@ def try_parse(type, value: str):
return None return None
USER_ID = try_parse(int, os.environ.get("USER_ID")) or 1 USER_ID = 1
# MQTT config # MQTT config
MQTT_BROKER_HOST = os.environ.get("MQTT_BROKER_HOST") or "mqtt" MQTT_BROKER_HOST = os.environ.get("MQTT_BROKER_HOST") or "mqtt"
MQTT_BROKER_PORT = try_parse(int, os.environ.get("MQTT_BROKER_PORT")) or 1883 MQTT_BROKER_PORT = try_parse(int, os.environ.get("MQTT_BROKER_PORT")) or 1883

View File

@@ -3,6 +3,6 @@ from dataclasses import dataclass
@dataclass @dataclass
class Accelerometer: class Accelerometer:
x: float x: int
y: float y: int
z: float z: int

View File

@@ -15,7 +15,6 @@ class FileDatasource:
def __init__( def __init__(
self, self,
acc_divisor: float,
accelerometer_filename: str, accelerometer_filename: str,
gps_filename: str, gps_filename: str,
park_filename: str, park_filename: str,
@@ -35,8 +34,6 @@ class FileDatasource:
self._started = False self._started = False
self.acc_divisor = acc_divisor
def startReading(self, *args, **kwargs): def startReading(self, *args, **kwargs):
"""Must be called before read()""" """Must be called before read()"""
if self._started: if self._started:
@@ -163,14 +160,15 @@ class FileDatasource:
return row return row
def _parse_acc(self, row: List[str]) -> Accelerometer: @staticmethod
def _parse_acc(row: List[str]) -> Accelerometer:
if len(row) < 3: if len(row) < 3:
raise ValueError(f"Accelerometer row must have 3 values (x,y,z). Got: {row}") raise ValueError(f"Accelerometer row must have 3 values (x,y,z). Got: {row}")
try: try:
x = int(row[0]) / self.acc_divisor x = int(row[0])
y = int(row[1]) / self.acc_divisor y = int(row[1])
z = int(row[2]) / self.acc_divisor z = int(row[2])
except ValueError as e: except ValueError as e:
raise ValueError(f"Invalid accelerometer values (expected integers): {row}") from e raise ValueError(f"Invalid accelerometer values (expected integers): {row}") from e

View File

@@ -1,7 +1,6 @@
from paho.mqtt import client as mqtt_client from paho.mqtt import client as mqtt_client
from schema.aggregated_data_schema import AggregatedDataSchema from schema.aggregated_data_schema import AggregatedDataSchema
from file_datasource import FileDatasource from file_datasource import FileDatasource
import logging
import config import config
@@ -16,8 +15,6 @@ def connect_mqtt(broker, port):
print("Failed to connect {broker}:{port}, return code %d\n", rc) print("Failed to connect {broker}:{port}, return code %d\n", rc)
exit(rc) # Stop execution exit(rc) # Stop execution
logging.info(f"Acting as USER_ID = {config.USER_ID}")
client = mqtt_client.Client() client = mqtt_client.Client()
client.on_connect = on_connect client.on_connect = on_connect
client.connect(broker, port) client.connect(broker, port)
@@ -31,18 +28,16 @@ def publish(client, topic, datasource):
data = datasource.read() data = datasource.read()
msg = AggregatedDataSchema().dumps(data) msg = AggregatedDataSchema().dumps(data)
result = client.publish(topic, msg) result = client.publish(topic, msg)
logging.debug(f"Published to {topic}: {msg[:50]}...")
status = result[0] status = result[0]
if status != 0: if status != 0:
logging.error(f"Failed to send message to topic {topic}") print(f"Failed to send message to topic {topic}")
def run(): def run():
logging.basicConfig(level = logging.INFO)
# Prepare mqtt client # Prepare mqtt client
client = connect_mqtt(config.MQTT_BROKER_HOST, config.MQTT_BROKER_PORT) client = connect_mqtt(config.MQTT_BROKER_HOST, config.MQTT_BROKER_PORT)
# Prepare datasource # Prepare datasource
datasource = FileDatasource(16384.0, "data/accelerometer.csv", "data/gps.csv", "data/parking.csv") datasource = FileDatasource("data/accelerometer.csv", "data/gps.csv", "data/parking.csv")
# Infinity publish data # Infinity publish data
publish(client, config.MQTT_TOPIC, datasource) publish(client, config.MQTT_TOPIC, datasource)

View File

@@ -2,6 +2,6 @@ from marshmallow import Schema, fields
class AccelerometerSchema(Schema): class AccelerometerSchema(Schema):
x = fields.Float() x = fields.Int()
y = fields.Float() y = fields.Int()
z = fields.Float() z = fields.Int()

View File

@@ -22,7 +22,6 @@ services:
depends_on: depends_on:
- mqtt - mqtt
environment: environment:
PYTHONUNBUFFERED: 1
MQTT_BROKER_HOST: "mqtt" MQTT_BROKER_HOST: "mqtt"
MQTT_BROKER_PORT: 1883 MQTT_BROKER_PORT: 1883
MQTT_TOPIC: "agent_data_topic" MQTT_TOPIC: "agent_data_topic"
@@ -40,10 +39,9 @@ services:
environment: environment:
MQTT_BROKER_HOST: "mqtt" MQTT_BROKER_HOST: "mqtt"
MQTT_BROKER_PORT: 1883 MQTT_BROKER_PORT: 1883
MQTT_TOPIC: "agent_data_topic" MQTT_TOPIC: " "
HUB_HOST: "hub" HUB_HOST: "store"
HUB_PORT: 8000 HUB_PORT: 8000
HUB_CONNECTION_TYPE: "http"
HUB_MQTT_BROKER_HOST: "mqtt" HUB_MQTT_BROKER_HOST: "mqtt"
HUB_MQTT_BROKER_PORT: 1883 HUB_MQTT_BROKER_PORT: 1883
HUB_MQTT_TOPIC: "processed_data_topic" HUB_MQTT_TOPIC: "processed_data_topic"
@@ -91,7 +89,6 @@ services:
- postgres_db - postgres_db
restart: always restart: always
environment: environment:
PYTHONUNBUFFERED: 1
POSTGRES_USER: user POSTGRES_USER: user
POSTGRES_PASSWORD: pass POSTGRES_PASSWORD: pass
POSTGRES_DB: test_db POSTGRES_DB: test_db
@@ -123,7 +120,6 @@ services:
- redis - redis
- store - store
environment: environment:
PYTHONUNBUFFERED: 1
STORE_API_HOST: "store" STORE_API_HOST: "store"
STORE_API_PORT: 8000 STORE_API_PORT: 8000
REDIS_HOST: "redis" REDIS_HOST: "redis"

View File

@@ -13,7 +13,9 @@ class AgentMQTTAdapter(AgentGateway):
broker_port, broker_port,
topic, topic,
hub_gateway: HubGateway, hub_gateway: HubGateway,
batch_size=10,
): ):
self.batch_size = batch_size
# MQTT # MQTT
self.broker_host = broker_host self.broker_host = broker_host
self.broker_port = broker_port self.broker_port = broker_port
@@ -33,21 +35,42 @@ class AgentMQTTAdapter(AgentGateway):
"""Processing agent data and sent it to hub gateway""" """Processing agent data and sent it to hub gateway"""
try: try:
payload: str = msg.payload.decode("utf-8") payload: str = msg.payload.decode("utf-8")
# Create AgentData instance with the received data
agent_data = AgentData.model_validate_json(payload, strict=True) agent_data = AgentData.model_validate_json(payload, strict=True)
# Process the received data (you can call a use case here if needed)
processed_data = process_agent_data(agent_data) processed_data = process_agent_data(agent_data)
# Store the agent_data in the database (you can send it to the data processing module)
if self.hub_gateway.save_data(processed_data): if not self.hub_gateway.save_data(processed_data):
logging.info("Processed data successfully forwarded to the Hub.") logging.error("Hub is not available")
else:
logging.error("Failed to send data: Hub gateway is unavailable.")
except Exception as e: except Exception as e:
logging.error(f"Error processing MQTT message: {e}") logging.info(f"Error processing MQTT message: {e}")
def connect(self): def connect(self):
self.client.on_connect = self.on_connect self.client.on_connect = self.on_connect
self.client.on_message = self.on_message self.client.on_message = self.on_message
self.client.connect(self.broker_host, self.broker_port, 60) self.client.connect(self.broker_host, self.broker_port, 60)
def loop_forever(self): def start(self):
self.client.loop_forever() self.client.loop_start()
def stop(self):
self.client.loop_stop()
# Usage example:
if __name__ == "__main__":
broker_host = "localhost"
broker_port = 1883
topic = "agent_data_topic"
# Assuming you have implemented the StoreGateway and passed it to the adapter
store_gateway = HubGateway()
adapter = AgentMQTTAdapter(broker_host, broker_port, topic, store_gateway)
adapter.connect()
adapter.start()
try:
# Keep the adapter running in the background
while True:
pass
except KeyboardInterrupt:
adapter.stop()
logging.info("Adapter stopped.")

View File

@@ -14,7 +14,6 @@ class GpsData(BaseModel):
class AgentData(BaseModel): class AgentData(BaseModel):
user_id: int
accelerometer: AccelerometerData accelerometer: AccelerometerData
gps: GpsData gps: GpsData
timestamp: datetime timestamp: datetime

View File

@@ -26,8 +26,15 @@ class AgentGateway(ABC):
pass pass
@abstractmethod @abstractmethod
def loop_forever(self): def start(self):
""" """
Method to await for new messages. Method to start listening for messages from the agent.
"""
pass
@abstractmethod
def stop(self):
"""
Method to stop the agent gateway and clean up resources.
""" """
pass pass

View File

@@ -1,7 +1,6 @@
from app.entities.agent_data import AgentData from app.entities.agent_data import AgentData
from app.entities.processed_agent_data import ProcessedAgentData from app.entities.processed_agent_data import ProcessedAgentData
_last_detection_state = {}
def process_agent_data( def process_agent_data(
agent_data: AgentData, agent_data: AgentData,
@@ -13,24 +12,4 @@ def process_agent_data(
Returns: Returns:
processed_data_batch (ProcessedAgentData): Processed data containing the classified state of the road surface and agent data. processed_data_batch (ProcessedAgentData): Processed data containing the classified state of the road surface and agent data.
""" """
user_id = agent_data.user_id # Implement it
road_state = "normal"
last_detection_state = _last_detection_state.get(user_id, False)
if (agent_data.accelerometer.z < 0.6):
road_state = "pothole"
elif (agent_data.accelerometer.z > 1.2):
road_state = "bump"
detection_happened = road_state != "normal"
if not (not last_detection_state and detection_happened):
road_state = "normal"
_last_detection_state[user_id] = detection_happened
return ProcessedAgentData(
road_state=road_state,
agent_data=agent_data
)

View File

@@ -16,12 +16,9 @@ MQTT_TOPIC = os.environ.get("MQTT_TOPIC") or "agent_data_topic"
# Configuration for hub MQTT # Configuration for hub MQTT
HUB_MQTT_BROKER_HOST = os.environ.get("HUB_MQTT_BROKER_HOST") or "localhost" HUB_MQTT_BROKER_HOST = os.environ.get("HUB_MQTT_BROKER_HOST") or "localhost"
HUB_MQTT_BROKER_PORT = try_parse_int(os.environ.get("HUB_MQTT_BROKER_PORT")) or 1883 HUB_MQTT_BROKER_PORT = try_parse_int(os.environ.get("HUB_MQTT_BROKER_PORT")) or 1883
HUB_MQTT_TOPIC = os.environ.get("HUB_MQTT_TOPIC") or "processed_data_topic" HUB_MQTT_TOPIC = os.environ.get("HUB_MQTT_TOPIC") or "processed_agent_data_topic"
# Configuration for the Hub # Configuration for the Hub
HUB_HOST = os.environ.get("HUB_HOST") or "localhost" HUB_HOST = os.environ.get("HUB_HOST") or "localhost"
HUB_PORT = try_parse_int(os.environ.get("HUB_PORT")) or 8000 HUB_PORT = try_parse_int(os.environ.get("HUB_PORT")) or 12000
HUB_URL = f"http://{HUB_HOST}:{HUB_PORT}" HUB_URL = f"http://{HUB_HOST}:{HUB_PORT}"
# For choosing type of connection
HUB_CONNECTION_TYPE = os.environ.get("HUB_CONNECTION_TYPE") or "mqtt"

View File

@@ -10,51 +10,42 @@ from config import (
HUB_MQTT_BROKER_HOST, HUB_MQTT_BROKER_HOST,
HUB_MQTT_BROKER_PORT, HUB_MQTT_BROKER_PORT,
HUB_MQTT_TOPIC, HUB_MQTT_TOPIC,
HUB_CONNECTION_TYPE,
) )
if __name__ == "__main__": if __name__ == "__main__":
# Configure logging settings # Configure logging settings
logging.basicConfig( logging.basicConfig(
level=logging.INFO, level=logging.INFO, # Set the log level to INFO (you can use logging.DEBUG for more detailed logs)
format="[%(asctime)s] [%(levelname)s] [%(module)s] %(message)s", format="[%(asctime)s] [%(levelname)s] [%(module)s] %(message)s",
handlers=[ handlers=[
logging.StreamHandler(), logging.StreamHandler(), # Output log messages to the console
logging.FileHandler("app.log"), logging.FileHandler("app.log"), # Save log messages to a file
], ],
) )
# Create an instance of the StoreApiAdapter using the configuration
# Logic to select the adapter based on configuration (SCRUM-93 & SCRUM-94) # hub_adapter = HubHttpAdapter(
# This allows easy switching between HTTP and MQTT protocols # api_base_url=HUB_URL,
if HUB_CONNECTION_TYPE.lower() == "http": # )
logging.info("Initializing HubHttpAdapter (SCRUM-93 integration)") hub_adapter = HubMqttAdapter(
hub_adapter = HubHttpAdapter( broker=HUB_MQTT_BROKER_HOST,
api_base_url=HUB_URL, port=HUB_MQTT_BROKER_PORT,
) topic=HUB_MQTT_TOPIC,
else: )
logging.info("Initializing HubMqttAdapter (SCRUM-94 integration)") # Create an instance of the AgentMQTTAdapter using the configuration
hub_adapter = HubMqttAdapter(
broker=HUB_MQTT_BROKER_HOST,
port=HUB_MQTT_BROKER_PORT,
topic=HUB_MQTT_TOPIC,
)
# Create an instance of the AgentMQTTAdapter using the selected hub adapter
# This adapter acts as a bridge between the Agent and the Hub
agent_adapter = AgentMQTTAdapter( agent_adapter = AgentMQTTAdapter(
broker_host=MQTT_BROKER_HOST, broker_host=MQTT_BROKER_HOST,
broker_port=MQTT_BROKER_PORT, broker_port=MQTT_BROKER_PORT,
topic=MQTT_TOPIC, topic=MQTT_TOPIC,
hub_gateway=hub_adapter, hub_gateway=hub_adapter,
) )
try: try:
logging.info(f"Connecting to MQTT broker at {MQTT_BROKER_HOST}:{MQTT_BROKER_PORT}") # Connect to the MQTT broker and start listening for messages
agent_adapter.connect() agent_adapter.connect()
agent_adapter.start()
logging.info("Broker connection success. Waiting for data...") # Keep the system running indefinitely (you can add other logic as needed)
agent_adapter.loop_forever() while True:
pass
except KeyboardInterrupt: except KeyboardInterrupt:
logging.info("Interrupt signal received. Shutting down...") # Stop the MQTT adapter and exit gracefully if interrupted by the user
agent_adapter.disconnect() agent_adapter.stop()
logging.info("Disconnected from MQTT broker.") logging.info("System stopped.")

View File

@@ -33,7 +33,7 @@ processed_agent_data = Table(
) )
# WebSocket subscriptions # WebSocket subscriptions
subscriptions: Set[WebSocket] = set() subscriptions: Dict[int, Set[WebSocket]] = {}
# FastAPI WebSocket endpoint # FastAPI WebSocket endpoint
@@ -41,7 +41,10 @@ subscriptions: Set[WebSocket] = set()
async def websocket_endpoint(websocket: WebSocket, user_id: int): async def websocket_endpoint(websocket: WebSocket, user_id: int):
await websocket.accept() await websocket.accept()
subscriptions.add(websocket) if user_id not in subscriptions:
subscriptions[user_id] = set()
subscriptions[user_id].add(websocket)
try: try:
# send already available data # send already available data
@@ -52,20 +55,20 @@ async def websocket_endpoint(websocket: WebSocket, user_id: int):
for i in jsonable_data: for i in jsonable_data:
i['timestamp'] = i['timestamp'].strftime("%Y-%m-%dT%H:%M:%SZ") i['timestamp'] = i['timestamp'].strftime("%Y-%m-%dT%H:%M:%SZ")
for i in jsonable_data: await websocket.send_json(json.dumps(jsonable_data))
await websocket.send_json(json.dumps([i]))
# receive forever # receive forever
while True: while True:
await websocket.receive_text() await websocket.receive_text()
except WebSocketDisconnect: except WebSocketDisconnect:
subscriptions.remove(websocket) subscriptions[user_id].remove(websocket)
# Function to send data to subscribed users # Function to send data to subscribed users
async def send_data_to_subscribers(data): async def send_data_to_subscribers(user_id: int, data):
for websocket in subscriptions: if user_id in subscriptions:
await websocket.send_json(json.dumps([data])) for websocket in subscriptions[user_id]:
await websocket.send_json(json.dumps(data))
# FastAPI CRUDL endpoints # FastAPI CRUDL endpoints
@@ -97,7 +100,7 @@ async def create_processed_agent_data(data: List[ProcessedAgentData], user_id: i
session.commit() session.commit()
for record in created_records: for record in created_records:
await send_data_to_subscribers(jsonable_encoder(record)) await send_data_to_subscribers(user_id, jsonable_encoder(record))
return created_records return created_records
except Exception as err: except Exception as err:
session.rollback() session.rollback()

View File

@@ -1,7 +1,4 @@
import sys import sys
import os
print("Checking for dead containers...")
l = [i for i in sys.stdin.read().split("\n") if i] l = [i for i in sys.stdin.read().split("\n") if i]
header, statuses = l[0], l[1:] header, statuses = l[0], l[1:]
@@ -15,9 +12,6 @@ for i in statuses:
if not i[status_index:].startswith("Up "): if not i[status_index:].startswith("Up "):
service_name = i[name_index:] service_name = i[name_index:]
print(f"Crash detected in {service_name}") print(f"Crash detected in {service_name}")
print(f"docker logs for the container:\n")
os.system(f"docker logs {i.split(' ')[0]}")
print()
exit_code = 1 exit_code = 1
sys.exit(exit_code) sys.exit(1)