Compare commits
46 Commits
lab5/grysh
...
92c91c2594
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
92c91c2594 | ||
|
|
2e623c3a93 | ||
|
|
edd2360d88 | ||
|
|
8e9dcb50c1 | ||
|
|
e2c3cda458 | ||
|
|
780855c93f | ||
|
|
86d7b1aaf8 | ||
|
|
d627836e47 | ||
| 0c2392dc0b | |||
|
|
65f767d38e | ||
| 0695e3d092 | |||
|
|
d6e094e6c0 | ||
|
|
2167eb2960 | ||
|
|
38374a6723 | ||
| c08612f71a | |||
| bde51ca5e1 | |||
| a204bb1676 | |||
| 764fb77f27 | |||
|
|
a55fc17711 | ||
| b34e385128 | |||
| a8a0ef5e15 | |||
| 00b037a243 | |||
| d1b6c0eed1 | |||
| 5e890d4f03 | |||
| a8e50d0386 | |||
| 1b42be264d | |||
|
|
b12bdc334c | ||
|
|
e8ff1c6cbd | ||
|
|
ad70519f47 | ||
|
|
b10aec1020 | ||
|
|
c085a49c8c | ||
| 0b8d2eb18b | |||
| 2846130e4e | |||
| 30af132033 | |||
| 60a846d8b8 | |||
| fe6bb6ab3a | |||
|
|
30f81ec1ae | ||
| 1b6f47fa0d | |||
| b1e6ad7c94 | |||
|
|
1eddfd966b | ||
| 8af68d6dd9 | |||
| 63aca15824 | |||
| ee509f72a4 | |||
| da9fe69d4e | |||
| 1c856dca0e | |||
|
|
17738d07fe |
16
.gitea/workflows/reset-docker.yaml
Normal file
16
.gitea/workflows/reset-docker.yaml
Normal file
@@ -0,0 +1,16 @@
|
||||
name: Reset docker state
|
||||
on: workflow_dispatch
|
||||
|
||||
jobs:
|
||||
reset:
|
||||
runs-on: host-arch-x86_64
|
||||
name: Reset docker state
|
||||
steps:
|
||||
- name: Stop all containers
|
||||
run: docker stop $(docker ps -a | cut -d " " -f 1 | tail -n +2)
|
||||
|
||||
- name: Remove all containers
|
||||
run: docker rm $(docker ps -a | cut -d " " -f 1 | tail -n +2)
|
||||
|
||||
- name: Remove extra volumes
|
||||
run: docker volume rm road_vision_postgres_data road_vision_pgadmin-data
|
||||
71
.gitea/workflows/tests.yaml
Normal file
71
.gitea/workflows/tests.yaml
Normal file
@@ -0,0 +1,71 @@
|
||||
name: Component testing
|
||||
on: [push, workflow_dispatch]
|
||||
|
||||
jobs:
|
||||
hub-test:
|
||||
name: Hub testing
|
||||
runs-on: host-arch-x86_64
|
||||
steps:
|
||||
- name: Clone repository
|
||||
run: git clone --revision ${{ gitea.sha }} --depth 1 ${{ gitea.server_url }}/${{ gitea.repository }}
|
||||
|
||||
- name: Build Hub testing container
|
||||
working-directory: IoT-Systems
|
||||
run: docker build -t local/hub/${{gitea.sha}} -f hub/Dockerfile-test .
|
||||
|
||||
- name: Run Hub tests
|
||||
working-directory: IoT-Systems
|
||||
run: docker run --rm -it local/hub/${{gitea.sha}}
|
||||
|
||||
- name: Clean up containers
|
||||
if: ${{always()}}
|
||||
run: docker image rm local/hub/${{gitea.sha}}
|
||||
|
||||
store-test:
|
||||
name: Store testing
|
||||
runs-on: host-arch-x86_64
|
||||
steps:
|
||||
- name: Clone repository
|
||||
run: git clone --revision ${{ gitea.sha }} --depth 1 ${{ gitea.server_url }}/${{ gitea.repository }}
|
||||
|
||||
- name: Build Store testing container
|
||||
working-directory: IoT-Systems
|
||||
run: docker build -t local/store/${{gitea.sha}} -f store/Dockerfile-test .
|
||||
|
||||
- name: Run Store tests
|
||||
working-directory: IoT-Systems
|
||||
run: docker run --rm -it local/store/${{gitea.sha}}
|
||||
|
||||
- name: Clean up containers
|
||||
if: ${{always()}}
|
||||
run: docker image rm local/store/${{gitea.sha}}
|
||||
|
||||
integration-smoke-test:
|
||||
name: Integration smoke testing
|
||||
runs-on: host-arch-x86_64
|
||||
needs:
|
||||
- hub-test
|
||||
- store-test
|
||||
steps:
|
||||
- name: Clone repository
|
||||
run: git clone --revision ${{ gitea.sha }} --depth 1 ${{ gitea.server_url }}/${{ gitea.repository }}
|
||||
|
||||
- name: Build all production containers
|
||||
working-directory: IoT-Systems
|
||||
run: docker-compose build
|
||||
|
||||
- name: Start all production containers
|
||||
working-directory: IoT-Systems
|
||||
run: docker-compose up -d
|
||||
|
||||
- name: Wait for crashes to happen
|
||||
run: sleep 30
|
||||
|
||||
- name: Check for dead containers
|
||||
working-directory: IoT-Systems
|
||||
run: docker ps -a | python3 utils/check-up.py
|
||||
|
||||
- name: Clean up
|
||||
if: ${{always()}}
|
||||
working-directory: IoT-Systems
|
||||
run: docker-compose down -v
|
||||
@@ -75,6 +75,7 @@ class Datasource:
|
||||
processed_agent_data.latitude,
|
||||
processed_agent_data.longitude,
|
||||
processed_agent_data.road_state,
|
||||
processed_agent_data.user_id
|
||||
)
|
||||
for processed_agent_data in processed_agent_data_list
|
||||
]
|
||||
|
||||
@@ -2,6 +2,6 @@ from dataclasses import dataclass
|
||||
|
||||
@dataclass
|
||||
class Accelerometer:
|
||||
x: int
|
||||
y: int
|
||||
z: int
|
||||
x: float
|
||||
y: float
|
||||
z: float
|
||||
|
||||
@@ -5,6 +5,14 @@ from kivy.clock import Clock
|
||||
from lineMapLayer import LineMapLayer
|
||||
from datasource import Datasource
|
||||
|
||||
line_layer_colors = [
|
||||
[1, 0, 0, 1],
|
||||
[1, 0.5, 0, 1],
|
||||
[0, 1, 0, 1],
|
||||
[0, 1, 1, 1],
|
||||
[0, 0, 1, 1],
|
||||
[1, 0, 1, 1],
|
||||
]
|
||||
|
||||
class MapViewApp(App):
|
||||
def __init__(self, **kwargs):
|
||||
@@ -12,8 +20,8 @@ class MapViewApp(App):
|
||||
|
||||
self.mapview = None
|
||||
self.datasource = Datasource(user_id=1)
|
||||
self.line_layer = None
|
||||
self.car_marker = None
|
||||
self.line_layers = dict()
|
||||
self.car_markers = dict()
|
||||
|
||||
# додати необхідні змінні
|
||||
self.bump_markers = []
|
||||
@@ -23,7 +31,8 @@ class MapViewApp(App):
|
||||
"""
|
||||
Встановлює необхідні маркери, викликає функцію для оновлення мапи
|
||||
"""
|
||||
Clock.schedule_once(lambda dt: self.set_bump_marker((50.4501, 30.5234)), 0)
|
||||
self.update()
|
||||
Clock.schedule_interval(self.update, 0.1)
|
||||
|
||||
def update(self, *args):
|
||||
"""
|
||||
@@ -36,13 +45,17 @@ class MapViewApp(App):
|
||||
|
||||
for point in new_points:
|
||||
|
||||
lat, lon, road_state = point
|
||||
lat, lon, road_state, user_id = point
|
||||
|
||||
# Оновлює лінію маршрута
|
||||
self.line_layer.add_point((lat, lon))
|
||||
if user_id not in self.line_layers:
|
||||
self.line_layers[user_id] = LineMapLayer(color = line_layer_colors[user_id % len(line_layer_colors)])
|
||||
self.mapview.add_layer(self.line_layers[user_id])
|
||||
|
||||
self.line_layers[user_id].add_point((lat, lon))
|
||||
|
||||
# Оновлює маркер маниши
|
||||
self.update_car_marker((lat, lon))
|
||||
self.update_car_marker(lat, lon, user_id)
|
||||
|
||||
# Перевіряємо стан дороги
|
||||
self.check_road_quality(point)
|
||||
@@ -55,26 +68,24 @@ class MapViewApp(App):
|
||||
if len(point) < 3:
|
||||
return
|
||||
|
||||
lat, lon, road_state = point
|
||||
lat, lon, road_state, user_id = point
|
||||
|
||||
if road_state == "pothole":
|
||||
self.set_pothole_marker((lat, lon))
|
||||
elif road_state == "bump":
|
||||
self.set_bump_marker((lat, lon))
|
||||
|
||||
def update_car_marker(self, point):
|
||||
def update_car_marker(self, lat, lon, user_id):
|
||||
"""
|
||||
Оновлює відображення маркера машини на мапі
|
||||
:param point: GPS координати
|
||||
"""
|
||||
lat, lon = point[0], point[1]
|
||||
|
||||
if not hasattr(self, 'car_marker'):
|
||||
self.car_marker = MapMarker(lat=lat, lon=lon, source='./images/car')
|
||||
self.mapview.add_marker(self.car_marker)
|
||||
if user_id not in self.car_markers:
|
||||
self.car_markers[user_id] = MapMarker(lat=lat, lon=lon, source='./images/car.png')
|
||||
self.mapview.add_marker(self.car_markers[user_id])
|
||||
else:
|
||||
self.car_marker.lat = lat
|
||||
self.car_marker.lon = lon
|
||||
self.car_markers[user_id].lat = lat
|
||||
self.car_markers[user_id].lon = lon
|
||||
|
||||
self.mapview.center_on(lat, lon)
|
||||
|
||||
@@ -128,9 +139,6 @@ class MapViewApp(App):
|
||||
lon=30.5234
|
||||
)
|
||||
|
||||
self.line_layer = LineMapLayer()
|
||||
self.mapview.add_layer(self.line_layer)
|
||||
|
||||
return self.mapview
|
||||
|
||||
|
||||
|
||||
@@ -1,34 +0,0 @@
|
||||
name: "road_vision"
|
||||
services:
|
||||
mqtt:
|
||||
image: eclipse-mosquitto
|
||||
container_name: mqtt
|
||||
volumes:
|
||||
- ./mosquitto:/mosquitto
|
||||
- ./mosquitto/data:/mosquitto/data
|
||||
- ./mosquitto/log:/mosquitto/log
|
||||
ports:
|
||||
- 1883:1883
|
||||
- 9001:9001
|
||||
networks:
|
||||
mqtt_network:
|
||||
|
||||
|
||||
fake_agent:
|
||||
container_name: agent
|
||||
build:
|
||||
context: ../../
|
||||
dockerfile: agent/Dockerfile
|
||||
depends_on:
|
||||
- mqtt
|
||||
environment:
|
||||
MQTT_BROKER_HOST: "mqtt"
|
||||
MQTT_BROKER_PORT: 1883
|
||||
MQTT_TOPIC: "agent_data_topic"
|
||||
DELAY: 0.1
|
||||
networks:
|
||||
mqtt_network:
|
||||
|
||||
|
||||
networks:
|
||||
mqtt_network:
|
||||
@@ -8,7 +8,7 @@ def try_parse(type, value: str):
|
||||
return None
|
||||
|
||||
|
||||
USER_ID = 1
|
||||
USER_ID = try_parse(int, os.environ.get("USER_ID")) or 1
|
||||
# MQTT config
|
||||
MQTT_BROKER_HOST = os.environ.get("MQTT_BROKER_HOST") or "mqtt"
|
||||
MQTT_BROKER_PORT = try_parse(int, os.environ.get("MQTT_BROKER_PORT")) or 1883
|
||||
|
||||
@@ -3,6 +3,6 @@ from dataclasses import dataclass
|
||||
|
||||
@dataclass
|
||||
class Accelerometer:
|
||||
x: int
|
||||
y: int
|
||||
z: int
|
||||
x: float
|
||||
y: float
|
||||
z: float
|
||||
|
||||
@@ -15,6 +15,7 @@ class FileDatasource:
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
acc_divisor: float,
|
||||
accelerometer_filename: str,
|
||||
gps_filename: str,
|
||||
park_filename: str,
|
||||
@@ -34,6 +35,8 @@ class FileDatasource:
|
||||
|
||||
self._started = False
|
||||
|
||||
self.acc_divisor = acc_divisor
|
||||
|
||||
def startReading(self, *args, **kwargs):
|
||||
"""Must be called before read()"""
|
||||
if self._started:
|
||||
@@ -160,15 +163,14 @@ class FileDatasource:
|
||||
|
||||
return row
|
||||
|
||||
@staticmethod
|
||||
def _parse_acc(row: List[str]) -> Accelerometer:
|
||||
def _parse_acc(self, row: List[str]) -> Accelerometer:
|
||||
if len(row) < 3:
|
||||
raise ValueError(f"Accelerometer row must have 3 values (x,y,z). Got: {row}")
|
||||
|
||||
try:
|
||||
x = int(row[0])
|
||||
y = int(row[1])
|
||||
z = int(row[2])
|
||||
x = int(row[0]) / self.acc_divisor
|
||||
y = int(row[1]) / self.acc_divisor
|
||||
z = int(row[2]) / self.acc_divisor
|
||||
except ValueError as e:
|
||||
raise ValueError(f"Invalid accelerometer values (expected integers): {row}") from e
|
||||
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
from paho.mqtt import client as mqtt_client
|
||||
from schema.aggregated_data_schema import AggregatedDataSchema
|
||||
from file_datasource import FileDatasource
|
||||
import logging
|
||||
import config
|
||||
|
||||
|
||||
@@ -28,6 +29,7 @@ def publish(client, topic, datasource):
|
||||
data = datasource.read()
|
||||
msg = AggregatedDataSchema().dumps(data)
|
||||
result = client.publish(topic, msg)
|
||||
logging.info(f"Published to {topic}: {msg[:50]}...")
|
||||
status = result[0]
|
||||
if status != 0:
|
||||
print(f"Failed to send message to topic {topic}")
|
||||
@@ -37,7 +39,7 @@ def run():
|
||||
# Prepare mqtt client
|
||||
client = connect_mqtt(config.MQTT_BROKER_HOST, config.MQTT_BROKER_PORT)
|
||||
# Prepare datasource
|
||||
datasource = FileDatasource("data/accelerometer.csv", "data/gps.csv", "data/parking.csv")
|
||||
datasource = FileDatasource(16384.0, "data/accelerometer.csv", "data/gps.csv", "data/parking.csv")
|
||||
# Infinity publish data
|
||||
publish(client, config.MQTT_TOPIC, datasource)
|
||||
|
||||
|
||||
@@ -2,6 +2,6 @@ from marshmallow import Schema, fields
|
||||
|
||||
|
||||
class AccelerometerSchema(Schema):
|
||||
x = fields.Int()
|
||||
y = fields.Int()
|
||||
z = fields.Int()
|
||||
x = fields.Float()
|
||||
y = fields.Float()
|
||||
z = fields.Float()
|
||||
|
||||
@@ -1,12 +1,12 @@
|
||||
name: "road_vision__hub"
|
||||
name: "road_vision"
|
||||
services:
|
||||
mqtt:
|
||||
image: eclipse-mosquitto
|
||||
container_name: mqtt
|
||||
volumes:
|
||||
- ./mosquitto:/mosquitto
|
||||
- ./mosquitto/data:/mosquitto/data
|
||||
- ./mosquitto/log:/mosquitto/log
|
||||
- ./agent/docker/mosquitto:/mosquitto
|
||||
- ./agent/docker/mosquitto/data:/mosquitto/data
|
||||
- ./agent/docker/mosquitto/log:/mosquitto/log
|
||||
ports:
|
||||
- 1883:1883
|
||||
- 9001:9001
|
||||
@@ -14,6 +14,43 @@ services:
|
||||
mqtt_network:
|
||||
|
||||
|
||||
fake_agent:
|
||||
container_name: agent
|
||||
build:
|
||||
context: .
|
||||
dockerfile: agent/Dockerfile
|
||||
depends_on:
|
||||
- mqtt
|
||||
environment:
|
||||
PYTHONUNBUFFERED: 1
|
||||
MQTT_BROKER_HOST: "mqtt"
|
||||
MQTT_BROKER_PORT: 1883
|
||||
MQTT_TOPIC: "agent_data_topic"
|
||||
DELAY: 0.1
|
||||
networks:
|
||||
mqtt_network:
|
||||
|
||||
edge:
|
||||
container_name: edge
|
||||
build:
|
||||
context: .
|
||||
dockerfile: edge/Dockerfile
|
||||
depends_on:
|
||||
- mqtt
|
||||
environment:
|
||||
MQTT_BROKER_HOST: "mqtt"
|
||||
MQTT_BROKER_PORT: 1883
|
||||
MQTT_TOPIC: "agent_data_topic"
|
||||
HUB_HOST: "hub"
|
||||
HUB_PORT: 8000
|
||||
HUB_CONNECTION_TYPE: "http"
|
||||
HUB_MQTT_BROKER_HOST: "mqtt"
|
||||
HUB_MQTT_BROKER_PORT: 1883
|
||||
HUB_MQTT_TOPIC: "processed_data_topic"
|
||||
networks:
|
||||
mqtt_network:
|
||||
edge_hub:
|
||||
|
||||
postgres_db:
|
||||
image: postgres:17
|
||||
container_name: postgres_db
|
||||
@@ -24,13 +61,12 @@ services:
|
||||
POSTGRES_DB: test_db
|
||||
volumes:
|
||||
- postgres_data:/var/lib/postgresql/data
|
||||
- ./db/structure.sql:/docker-entrypoint-initdb.d/structure.sql
|
||||
- ./store/docker/db/structure.sql:/docker-entrypoint-initdb.d/structure.sql
|
||||
ports:
|
||||
- "5432:5432"
|
||||
networks:
|
||||
db_network:
|
||||
|
||||
|
||||
pgadmin:
|
||||
container_name: pgadmin4
|
||||
image: dpage/pgadmin4
|
||||
@@ -49,12 +85,13 @@ services:
|
||||
store:
|
||||
container_name: store
|
||||
build:
|
||||
context: ../../
|
||||
context: .
|
||||
dockerfile: store/Dockerfile
|
||||
depends_on:
|
||||
- postgres_db
|
||||
restart: always
|
||||
environment:
|
||||
PYTHONUNBUFFERED: 1
|
||||
POSTGRES_USER: user
|
||||
POSTGRES_PASSWORD: pass
|
||||
POSTGRES_DB: test_db
|
||||
@@ -79,13 +116,14 @@ services:
|
||||
hub:
|
||||
container_name: hub
|
||||
build:
|
||||
context: ../../
|
||||
context: .
|
||||
dockerfile: hub/Dockerfile
|
||||
depends_on:
|
||||
- mqtt
|
||||
- redis
|
||||
- store
|
||||
environment:
|
||||
PYTHONUNBUFFERED: 1
|
||||
STORE_API_HOST: "store"
|
||||
STORE_API_PORT: 8000
|
||||
REDIS_HOST: "redis"
|
||||
@@ -101,10 +139,11 @@ services:
|
||||
hub_store:
|
||||
hub_redis:
|
||||
|
||||
|
||||
networks:
|
||||
mqtt_network:
|
||||
db_network:
|
||||
edge_hub:
|
||||
hub:
|
||||
hub_store:
|
||||
hub_redis:
|
||||
|
||||
@@ -13,9 +13,7 @@ class AgentMQTTAdapter(AgentGateway):
|
||||
broker_port,
|
||||
topic,
|
||||
hub_gateway: HubGateway,
|
||||
batch_size=10,
|
||||
):
|
||||
self.batch_size = batch_size
|
||||
# MQTT
|
||||
self.broker_host = broker_host
|
||||
self.broker_port = broker_port
|
||||
@@ -35,42 +33,21 @@ class AgentMQTTAdapter(AgentGateway):
|
||||
"""Processing agent data and sent it to hub gateway"""
|
||||
try:
|
||||
payload: str = msg.payload.decode("utf-8")
|
||||
# Create AgentData instance with the received data
|
||||
|
||||
agent_data = AgentData.model_validate_json(payload, strict=True)
|
||||
# Process the received data (you can call a use case here if needed)
|
||||
processed_data = process_agent_data(agent_data)
|
||||
# Store the agent_data in the database (you can send it to the data processing module)
|
||||
if not self.hub_gateway.save_data(processed_data):
|
||||
logging.error("Hub is not available")
|
||||
|
||||
if self.hub_gateway.save_data(processed_data):
|
||||
logging.info("Processed data successfully forwarded to the Hub.")
|
||||
else:
|
||||
logging.error("Failed to send data: Hub gateway is unavailable.")
|
||||
except Exception as e:
|
||||
logging.info(f"Error processing MQTT message: {e}")
|
||||
logging.error(f"Error processing MQTT message: {e}")
|
||||
|
||||
def connect(self):
|
||||
self.client.on_connect = self.on_connect
|
||||
self.client.on_message = self.on_message
|
||||
self.client.connect(self.broker_host, self.broker_port, 60)
|
||||
|
||||
def start(self):
|
||||
self.client.loop_start()
|
||||
|
||||
def stop(self):
|
||||
self.client.loop_stop()
|
||||
|
||||
|
||||
# Usage example:
|
||||
if __name__ == "__main__":
|
||||
broker_host = "localhost"
|
||||
broker_port = 1883
|
||||
topic = "agent_data_topic"
|
||||
# Assuming you have implemented the StoreGateway and passed it to the adapter
|
||||
store_gateway = HubGateway()
|
||||
adapter = AgentMQTTAdapter(broker_host, broker_port, topic, store_gateway)
|
||||
adapter.connect()
|
||||
adapter.start()
|
||||
try:
|
||||
# Keep the adapter running in the background
|
||||
while True:
|
||||
pass
|
||||
except KeyboardInterrupt:
|
||||
adapter.stop()
|
||||
logging.info("Adapter stopped.")
|
||||
def loop_forever(self):
|
||||
self.client.loop_forever()
|
||||
|
||||
@@ -14,6 +14,7 @@ class GpsData(BaseModel):
|
||||
|
||||
|
||||
class AgentData(BaseModel):
|
||||
user_id: int
|
||||
accelerometer: AccelerometerData
|
||||
gps: GpsData
|
||||
timestamp: datetime
|
||||
|
||||
@@ -26,15 +26,8 @@ class AgentGateway(ABC):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def start(self):
|
||||
def loop_forever(self):
|
||||
"""
|
||||
Method to start listening for messages from the agent.
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def stop(self):
|
||||
"""
|
||||
Method to stop the agent gateway and clean up resources.
|
||||
Method to await for new messages.
|
||||
"""
|
||||
pass
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
from app.entities.agent_data import AgentData
|
||||
from app.entities.processed_agent_data import ProcessedAgentData
|
||||
|
||||
_last_detection_state = {}
|
||||
|
||||
def process_agent_data(
|
||||
agent_data: AgentData,
|
||||
@@ -12,4 +13,24 @@ def process_agent_data(
|
||||
Returns:
|
||||
processed_data_batch (ProcessedAgentData): Processed data containing the classified state of the road surface and agent data.
|
||||
"""
|
||||
# Implement it
|
||||
user_id = agent_data.user_id
|
||||
road_state = "normal"
|
||||
|
||||
last_detection_state = _last_detection_state.get(user_id, False)
|
||||
|
||||
if (agent_data.accelerometer.z < 0.6):
|
||||
road_state = "pothole"
|
||||
elif (agent_data.accelerometer.z > 1.2):
|
||||
road_state = "bump"
|
||||
|
||||
detection_happened = road_state != "normal"
|
||||
|
||||
if not (not last_detection_state and detection_happened):
|
||||
road_state = "normal"
|
||||
|
||||
_last_detection_state[user_id] = detection_happened
|
||||
|
||||
return ProcessedAgentData(
|
||||
road_state=road_state,
|
||||
agent_data=agent_data
|
||||
)
|
||||
|
||||
@@ -16,9 +16,12 @@ MQTT_TOPIC = os.environ.get("MQTT_TOPIC") or "agent_data_topic"
|
||||
# Configuration for hub MQTT
|
||||
HUB_MQTT_BROKER_HOST = os.environ.get("HUB_MQTT_BROKER_HOST") or "localhost"
|
||||
HUB_MQTT_BROKER_PORT = try_parse_int(os.environ.get("HUB_MQTT_BROKER_PORT")) or 1883
|
||||
HUB_MQTT_TOPIC = os.environ.get("HUB_MQTT_TOPIC") or "processed_agent_data_topic"
|
||||
HUB_MQTT_TOPIC = os.environ.get("HUB_MQTT_TOPIC") or "processed_data_topic"
|
||||
|
||||
# Configuration for the Hub
|
||||
HUB_HOST = os.environ.get("HUB_HOST") or "localhost"
|
||||
HUB_PORT = try_parse_int(os.environ.get("HUB_PORT")) or 12000
|
||||
HUB_PORT = try_parse_int(os.environ.get("HUB_PORT")) or 8000
|
||||
HUB_URL = f"http://{HUB_HOST}:{HUB_PORT}"
|
||||
|
||||
# For choosing type of connection
|
||||
HUB_CONNECTION_TYPE = os.environ.get("HUB_CONNECTION_TYPE") or "mqtt"
|
||||
@@ -1,50 +0,0 @@
|
||||
version: "3.9"
|
||||
# name: "road_vision"
|
||||
services:
|
||||
mqtt:
|
||||
image: eclipse-mosquitto
|
||||
container_name: mqtt
|
||||
volumes:
|
||||
- ./mosquitto:/mosquitto
|
||||
- ./mosquitto/data:/mosquitto/data
|
||||
- ./mosquitto/log:/mosquitto/log
|
||||
ports:
|
||||
- 1883:1883
|
||||
- 19001:9001
|
||||
networks:
|
||||
mqtt_network:
|
||||
|
||||
|
||||
edge:
|
||||
container_name: edge
|
||||
build:
|
||||
context: ../../
|
||||
dockerfile: edge/Dockerfile
|
||||
depends_on:
|
||||
- mqtt
|
||||
environment:
|
||||
MQTT_BROKER_HOST: "mqtt"
|
||||
MQTT_BROKER_PORT: 1883
|
||||
MQTT_TOPIC: " "
|
||||
HUB_HOST: "store"
|
||||
HUB_PORT: 8000
|
||||
HUB_MQTT_BROKER_HOST: "mqtt"
|
||||
HUB_MQTT_BROKER_PORT: 1883
|
||||
HUB_MQTT_TOPIC: "processed_data_topic"
|
||||
networks:
|
||||
mqtt_network:
|
||||
edge_hub:
|
||||
|
||||
|
||||
networks:
|
||||
mqtt_network:
|
||||
db_network:
|
||||
edge_hub:
|
||||
hub:
|
||||
hub_store:
|
||||
hub_redis:
|
||||
|
||||
|
||||
volumes:
|
||||
postgres_data:
|
||||
pgadmin-data:
|
||||
51
edge/main.py
51
edge/main.py
@@ -10,42 +10,51 @@ from config import (
|
||||
HUB_MQTT_BROKER_HOST,
|
||||
HUB_MQTT_BROKER_PORT,
|
||||
HUB_MQTT_TOPIC,
|
||||
HUB_CONNECTION_TYPE,
|
||||
)
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Configure logging settings
|
||||
logging.basicConfig(
|
||||
level=logging.INFO, # Set the log level to INFO (you can use logging.DEBUG for more detailed logs)
|
||||
level=logging.INFO,
|
||||
format="[%(asctime)s] [%(levelname)s] [%(module)s] %(message)s",
|
||||
handlers=[
|
||||
logging.StreamHandler(), # Output log messages to the console
|
||||
logging.FileHandler("app.log"), # Save log messages to a file
|
||||
logging.StreamHandler(),
|
||||
logging.FileHandler("app.log"),
|
||||
],
|
||||
)
|
||||
# Create an instance of the StoreApiAdapter using the configuration
|
||||
# hub_adapter = HubHttpAdapter(
|
||||
# api_base_url=HUB_URL,
|
||||
# )
|
||||
hub_adapter = HubMqttAdapter(
|
||||
broker=HUB_MQTT_BROKER_HOST,
|
||||
port=HUB_MQTT_BROKER_PORT,
|
||||
topic=HUB_MQTT_TOPIC,
|
||||
)
|
||||
# Create an instance of the AgentMQTTAdapter using the configuration
|
||||
|
||||
# Logic to select the adapter based on configuration (SCRUM-93 & SCRUM-94)
|
||||
# This allows easy switching between HTTP and MQTT protocols
|
||||
if HUB_CONNECTION_TYPE.lower() == "http":
|
||||
logging.info("Initializing HubHttpAdapter (SCRUM-93 integration)")
|
||||
hub_adapter = HubHttpAdapter(
|
||||
api_base_url=HUB_URL,
|
||||
)
|
||||
else:
|
||||
logging.info("Initializing HubMqttAdapter (SCRUM-94 integration)")
|
||||
hub_adapter = HubMqttAdapter(
|
||||
broker=HUB_MQTT_BROKER_HOST,
|
||||
port=HUB_MQTT_BROKER_PORT,
|
||||
topic=HUB_MQTT_TOPIC,
|
||||
)
|
||||
|
||||
# Create an instance of the AgentMQTTAdapter using the selected hub adapter
|
||||
# This adapter acts as a bridge between the Agent and the Hub
|
||||
agent_adapter = AgentMQTTAdapter(
|
||||
broker_host=MQTT_BROKER_HOST,
|
||||
broker_port=MQTT_BROKER_PORT,
|
||||
topic=MQTT_TOPIC,
|
||||
hub_gateway=hub_adapter,
|
||||
)
|
||||
|
||||
try:
|
||||
# Connect to the MQTT broker and start listening for messages
|
||||
logging.info(f"Connecting to MQTT broker at {MQTT_BROKER_HOST}:{MQTT_BROKER_PORT}")
|
||||
agent_adapter.connect()
|
||||
agent_adapter.start()
|
||||
# Keep the system running indefinitely (you can add other logic as needed)
|
||||
while True:
|
||||
pass
|
||||
|
||||
logging.info("Broker connection success. Waiting for data...")
|
||||
agent_adapter.loop_forever()
|
||||
except KeyboardInterrupt:
|
||||
# Stop the MQTT adapter and exit gracefully if interrupted by the user
|
||||
agent_adapter.stop()
|
||||
logging.info("System stopped.")
|
||||
logging.info("Interrupt signal received. Shutting down...")
|
||||
agent_adapter.disconnect()
|
||||
logging.info("Disconnected from MQTT broker.")
|
||||
|
||||
12
hub/Dockerfile-test
Normal file
12
hub/Dockerfile-test
Normal file
@@ -0,0 +1,12 @@
|
||||
# Use the official Python image as the base image
|
||||
FROM python:3.9-slim
|
||||
# Set the working directory inside the container
|
||||
WORKDIR /app
|
||||
# Copy the requirements.txt file and install dependencies
|
||||
COPY hub/requirements.txt .
|
||||
|
||||
RUN pip install --no-cache-dir -r requirements.txt
|
||||
# Copy the entire application into the container
|
||||
COPY hub/. .
|
||||
# Run the main.py script inside the container when it starts
|
||||
CMD ["./test-entry.sh"]
|
||||
@@ -13,7 +13,7 @@ class StoreApiAdapter(StoreGateway):
|
||||
def __init__(self, api_base_url):
|
||||
self.api_base_url = api_base_url
|
||||
|
||||
def save_data(self, processed_agent_data_batch: List[ProcessedAgentData]):
|
||||
def processed_agent_data_batch_to_payload(self, processed_agent_data_batch: List[ProcessedAgentData]):
|
||||
if not processed_agent_data_batch:
|
||||
return False
|
||||
|
||||
@@ -25,6 +25,14 @@ class StoreApiAdapter(StoreGateway):
|
||||
"user_id": user_id
|
||||
}
|
||||
|
||||
return payload
|
||||
|
||||
def save_data(self, processed_agent_data_batch: List[ProcessedAgentData]):
|
||||
payload = self.processed_agent_data_batch_to_payload(processed_agent_data_batch)
|
||||
|
||||
if payload == False:
|
||||
return False
|
||||
|
||||
try:
|
||||
# Perform a POST request to the Store API with a 10-second timeout
|
||||
response = requests.post(
|
||||
|
||||
41
hub/app/adapters/store_api_adapter_test.py
Normal file
41
hub/app/adapters/store_api_adapter_test.py
Normal file
@@ -0,0 +1,41 @@
|
||||
from app.adapters.store_api_adapter import StoreApiAdapter
|
||||
from app.entities.agent_data import AccelerometerData, AgentData, GpsData
|
||||
from app.entities.processed_agent_data import ProcessedAgentData
|
||||
|
||||
def _test_processed_agent_data_batch_to_payload():
|
||||
processed_data_batch = [
|
||||
ProcessedAgentData(road_state = "normal",
|
||||
agent_data = AgentData(user_id = 1,
|
||||
accelerometer = AccelerometerData(x = 0.1, y = 0.2, z = 0.3),
|
||||
gps = GpsData(latitude = 10.123, longitude = 20.456),
|
||||
timestamp = "2023-07-21T12:34:56Z")
|
||||
),
|
||||
ProcessedAgentData(road_state = "normal",
|
||||
agent_data = AgentData(user_id = 2,
|
||||
accelerometer = AccelerometerData(x = 0.1, y = 0.2, z = 0.3),
|
||||
gps = GpsData(latitude = 10.123, longitude = 20.456),
|
||||
timestamp = "2023-07-21T12:34:56Z")
|
||||
),
|
||||
ProcessedAgentData(road_state = "normal",
|
||||
agent_data = AgentData(user_id = 3,
|
||||
accelerometer = AccelerometerData(x = 0.1, y = 0.2, z = 0.3),
|
||||
gps = GpsData(latitude = 10.123, longitude = 20.456),
|
||||
timestamp = "2023-07-21T12:34:56Z")
|
||||
),
|
||||
]
|
||||
|
||||
res = StoreApiAdapter(None).processed_agent_data_batch_to_payload(processed_data_batch)
|
||||
|
||||
assert res["data"][0]["agent_data"]["user_id"] == 1
|
||||
assert res["data"][1]["agent_data"]["user_id"] == 2
|
||||
assert res["data"][2]["agent_data"]["user_id"] == 3
|
||||
|
||||
assert StoreApiAdapter(None).processed_agent_data_batch_to_payload([]) == False
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_functions = [i for i in dir() if i.startswith('_test_')]
|
||||
|
||||
for i in test_functions:
|
||||
print(i)
|
||||
eval(i)()
|
||||
3
hub/test-entry.sh
Executable file
3
hub/test-entry.sh
Executable file
@@ -0,0 +1,3 @@
|
||||
#!/bin/sh
|
||||
|
||||
PYTHONPATH=$PWD python3 app/adapters/store_api_adapter_test.py
|
||||
13
store/Dockerfile-test
Normal file
13
store/Dockerfile-test
Normal file
@@ -0,0 +1,13 @@
|
||||
# Use the official Python image as the base image
|
||||
FROM python:latest
|
||||
# Set the working directory inside the container
|
||||
WORKDIR /app
|
||||
# Copy the requirements.txt file and install dependencies
|
||||
COPY store/requirements.txt .
|
||||
|
||||
RUN pip install --no-cache-dir -r requirements.txt
|
||||
# Copy the entire application into the container
|
||||
COPY store/. .
|
||||
# Run the main.py script inside the container when it starts
|
||||
#CMD ["uvicorn", "main:app", "--host", "0.0.0.0"]
|
||||
CMD ["./test-entry.sh"]
|
||||
@@ -7,5 +7,6 @@ CREATE TABLE processed_agent_data (
|
||||
z FLOAT,
|
||||
latitude FLOAT,
|
||||
longitude FLOAT,
|
||||
timestamp TIMESTAMP
|
||||
);
|
||||
timestamp TIMESTAMP,
|
||||
visible BOOLEAN
|
||||
);
|
||||
|
||||
@@ -1,61 +0,0 @@
|
||||
name: "road_vision__database"
|
||||
services:
|
||||
postgres_db:
|
||||
image: postgres:17
|
||||
container_name: postgres_db
|
||||
restart: always
|
||||
environment:
|
||||
POSTGRES_USER: user
|
||||
POSTGRES_PASSWORD: pass
|
||||
POSTGRES_DB: test_db
|
||||
volumes:
|
||||
- postgres_data:/var/lib/postgresql/data
|
||||
- ./db/structure.sql:/docker-entrypoint-initdb.d/structure.sql
|
||||
ports:
|
||||
- "5432:5432"
|
||||
networks:
|
||||
db_network:
|
||||
|
||||
|
||||
pgadmin:
|
||||
container_name: pgadmin4
|
||||
image: dpage/pgadmin4
|
||||
restart: always
|
||||
environment:
|
||||
PGADMIN_DEFAULT_EMAIL: admin@admin.com
|
||||
PGADMIN_DEFAULT_PASSWORD: root
|
||||
volumes:
|
||||
- pgadmin-data:/var/lib/pgadmin
|
||||
ports:
|
||||
- "5050:80"
|
||||
networks:
|
||||
db_network:
|
||||
|
||||
|
||||
store:
|
||||
container_name: store
|
||||
build:
|
||||
context: ../../
|
||||
dockerfile: store/Dockerfile
|
||||
depends_on:
|
||||
- postgres_db
|
||||
restart: always
|
||||
environment:
|
||||
POSTGRES_USER: user
|
||||
POSTGRES_PASSWORD: pass
|
||||
POSTGRES_DB: test_db
|
||||
POSTGRES_HOST: postgres_db
|
||||
POSTGRES_PORT: 5432
|
||||
ports:
|
||||
- "8000:8000"
|
||||
networks:
|
||||
db_network:
|
||||
|
||||
|
||||
networks:
|
||||
db_network:
|
||||
|
||||
|
||||
volumes:
|
||||
postgres_data:
|
||||
pgadmin-data:
|
||||
@@ -8,12 +8,13 @@ from sqlalchemy import (
|
||||
Integer,
|
||||
String,
|
||||
Float,
|
||||
Boolean,
|
||||
DateTime,
|
||||
)
|
||||
from sqlalchemy.sql import select
|
||||
|
||||
from database import metadata, SessionLocal
|
||||
from schemas import ProcessedAgentData, ProcessedAgentDataInDB
|
||||
from schemas import ProcessedAgentData, ProcessedAgentDataInDB, WebSocketData
|
||||
|
||||
# FastAPI app setup
|
||||
app = FastAPI()
|
||||
@@ -30,60 +31,93 @@ processed_agent_data = Table(
|
||||
Column("latitude", Float),
|
||||
Column("longitude", Float),
|
||||
Column("timestamp", DateTime),
|
||||
Column("visible", Boolean),
|
||||
)
|
||||
|
||||
# WebSocket subscriptions
|
||||
subscriptions: Dict[int, Set[WebSocket]] = {}
|
||||
subscriptions: Set[WebSocket] = set()
|
||||
|
||||
|
||||
# FastAPI WebSocket endpoint
|
||||
@app.websocket("/ws/{user_id}")
|
||||
async def websocket_endpoint(websocket: WebSocket, user_id: int):
|
||||
await websocket.accept()
|
||||
if user_id not in subscriptions:
|
||||
subscriptions[user_id] = set()
|
||||
subscriptions[user_id].add(websocket)
|
||||
|
||||
subscriptions.add(websocket)
|
||||
|
||||
try:
|
||||
# send already available data
|
||||
r = processed_agent_data.select()
|
||||
stored_data = SessionLocal().execute(r).fetchall()
|
||||
|
||||
jsonable_data = [{c.name: getattr(i, c.name) for c in processed_agent_data.columns} for i in stored_data]
|
||||
for i in jsonable_data:
|
||||
i['timestamp'] = i['timestamp'].strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||
|
||||
for i in jsonable_data:
|
||||
await websocket.send_json(json.dumps([i]))
|
||||
|
||||
# receive forever
|
||||
while True:
|
||||
await websocket.receive_text()
|
||||
data = await websocket.receive_text()
|
||||
try:
|
||||
if (data):
|
||||
ws_data = WebSocketData.model_validate(json.loads(data))
|
||||
session = SessionLocal()
|
||||
update_query = (
|
||||
processed_agent_data.update()
|
||||
.where(processed_agent_data.c.id == ws_data.id)
|
||||
.values(visible=False)
|
||||
).returning(processed_agent_data)
|
||||
res = session.execute(update_query).fetchone()
|
||||
if (not res):
|
||||
session.rollback()
|
||||
raise Exception("Error while websocket PUT")
|
||||
session.commit()
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
except WebSocketDisconnect:
|
||||
subscriptions[user_id].remove(websocket)
|
||||
subscriptions.remove(websocket)
|
||||
|
||||
|
||||
# Function to send data to subscribed users
|
||||
async def send_data_to_subscribers(user_id: int, data):
|
||||
if user_id in subscriptions:
|
||||
for websocket in subscriptions[user_id]:
|
||||
await websocket.send_json(json.dumps(data))
|
||||
async def send_data_to_subscribers(data):
|
||||
for websocket in subscriptions:
|
||||
await websocket.send_json(json.dumps([data]))
|
||||
|
||||
|
||||
# FastAPI CRUDL endpoints
|
||||
|
||||
|
||||
@app.post("/processed_agent_data/")
|
||||
async def create_processed_agent_data(data: List[ProcessedAgentData], user_id: int = Body(..., embed=True)):
|
||||
session = SessionLocal()
|
||||
try:
|
||||
created_data = [
|
||||
def ProcessedAgentData_to_td(data: List[ProcessedAgentData]):
|
||||
return [
|
||||
{
|
||||
"road_state": item.road_state,
|
||||
"user_id": user_id,
|
||||
"user_id": item.agent_data.user_id,
|
||||
"x": item.agent_data.accelerometer.x,
|
||||
"y": item.agent_data.accelerometer.y,
|
||||
"z": item.agent_data.accelerometer.z,
|
||||
"latitude": item.agent_data.gps.latitude,
|
||||
"longitude": item.agent_data.gps.longitude,
|
||||
"timestamp": item.agent_data.timestamp,
|
||||
"visible": True,
|
||||
}
|
||||
for item in data
|
||||
]
|
||||
|
||||
|
||||
@app.post("/processed_agent_data/")
|
||||
async def create_processed_agent_data(data: List[ProcessedAgentData], user_id: int = Body(..., embed=True)):
|
||||
session = SessionLocal()
|
||||
try:
|
||||
created_data = ProcessedAgentData_to_td(data)
|
||||
stmt = processed_agent_data.insert().values(created_data).returning(processed_agent_data)
|
||||
result = session.execute(stmt)
|
||||
created_records = [dict(row._mapping) for row in result.fetchall()]
|
||||
session.commit()
|
||||
|
||||
for record in created_records:
|
||||
await send_data_to_subscribers(user_id, jsonable_encoder(record))
|
||||
await send_data_to_subscribers(jsonable_encoder(record))
|
||||
return created_records
|
||||
except Exception as err:
|
||||
session.rollback()
|
||||
|
||||
@@ -49,3 +49,7 @@ class AgentData(BaseModel):
|
||||
class ProcessedAgentData(BaseModel):
|
||||
road_state: str
|
||||
agent_data: AgentData
|
||||
|
||||
class WebSocketData(BaseModel):
|
||||
id: int
|
||||
|
||||
|
||||
3
store/test-entry.sh
Executable file
3
store/test-entry.sh
Executable file
@@ -0,0 +1,3 @@
|
||||
#!/bin/sh
|
||||
|
||||
PYTHONPATH=$PWD python3 test/main_test.py
|
||||
39
store/test/main_test.py
Normal file
39
store/test/main_test.py
Normal file
@@ -0,0 +1,39 @@
|
||||
from schemas import AccelerometerData, AgentData, GpsData, ProcessedAgentData
|
||||
|
||||
import main
|
||||
|
||||
def _test_ProcessedAgentData_to_td():
|
||||
processed_data_batch = [
|
||||
ProcessedAgentData(road_state = "normal",
|
||||
agent_data = AgentData(user_id = 1,
|
||||
accelerometer = AccelerometerData(x = 0.1, y = 0.2, z = 0.3),
|
||||
gps = GpsData(latitude = 10.123, longitude = 20.456),
|
||||
timestamp = "2023-07-21T12:34:56Z")
|
||||
),
|
||||
ProcessedAgentData(road_state = "normal",
|
||||
agent_data = AgentData(user_id = 2,
|
||||
accelerometer = AccelerometerData(x = 0.1, y = 0.2, z = 0.3),
|
||||
gps = GpsData(latitude = 10.123, longitude = 20.456),
|
||||
timestamp = "2023-07-21T12:34:56Z")
|
||||
),
|
||||
ProcessedAgentData(road_state = "normal",
|
||||
agent_data = AgentData(user_id = 3,
|
||||
accelerometer = AccelerometerData(x = 0.1, y = 0.2, z = 0.3),
|
||||
gps = GpsData(latitude = 10.123, longitude = 20.456),
|
||||
timestamp = "2023-07-21T12:34:56Z")
|
||||
),
|
||||
]
|
||||
|
||||
res = main.ProcessedAgentData_to_td(processed_data_batch)
|
||||
|
||||
assert res[0]["user_id"] == 1
|
||||
assert res[1]["user_id"] == 2
|
||||
assert res[2]["user_id"] == 3
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_functions = [i for i in dir() if i.startswith('_test_')]
|
||||
|
||||
for i in test_functions:
|
||||
print(i)
|
||||
eval(i)()
|
||||
19
utils/check-up.py
Normal file
19
utils/check-up.py
Normal file
@@ -0,0 +1,19 @@
|
||||
import sys
|
||||
|
||||
print("Checking for dead containers...")
|
||||
|
||||
l = [i for i in sys.stdin.read().split("\n") if i]
|
||||
header, statuses = l[0], l[1:]
|
||||
|
||||
status_index = header.find('STATUS')
|
||||
name_index = header.find('NAMES')
|
||||
|
||||
exit_code = 0
|
||||
|
||||
for i in statuses:
|
||||
if not i[status_index:].startswith("Up "):
|
||||
service_name = i[name_index:]
|
||||
print(f"Crash detected in {service_name}")
|
||||
exit_code = 1
|
||||
|
||||
sys.exit(exit_code)
|
||||
Reference in New Issue
Block a user