Compare commits

...

28 Commits

Author SHA1 Message Date
hasslesstech 119547d288 [P] Add TRACK_ID selection in MapView
Component testing / Hub testing (push) Successful in 41s
Component testing / Store testing (push) Successful in 24s
Component testing / Integration smoke testing (push) Successful in 2m30s
2026-03-25 23:27:20 +02:00
hasslesstech b58167f0de [P] Fix wrong row sending order
Component testing / Hub testing (push) Successful in 35s
Component testing / Store testing (push) Successful in 26s
Component testing / Integration smoke testing (push) Successful in 2m37s
2026-03-25 23:26:39 +02:00
hasslesstech 121bd007b3 [P] Add container logs printout on crash
Component testing / Hub testing (push) Successful in 24s
Component testing / Store testing (push) Successful in 27s
Component testing / Integration smoke testing (push) Has been cancelled
2026-03-25 22:56:12 +02:00
hasslesstech db63eb6d79 [P] Improve logging logic in agent
Component testing / Hub testing (push) Successful in 24s
Component testing / Store testing (push) Successful in 27s
Component testing / Integration smoke testing (push) Successful in 2m25s
2026-03-25 15:40:07 +02:00
hasslesstech 77d6968297 [P] Fix map lat, lon confusion
Component testing / Hub testing (push) Successful in 23s
Component testing / Store testing (push) Successful in 18s
Component testing / Integration smoke testing (push) Successful in 2m22s
2026-03-25 14:57:38 +02:00
hasslesstech 0c2392dc0b Merge remote-tracking branch 'github/dev' into dev
Component testing / Hub testing (push) Successful in 20s
Component testing / Store testing (push) Successful in 22s
Component testing / Integration smoke testing (push) Successful in 2m18s
2026-03-25 13:12:18 +02:00
VladiusVostokus 65f767d38e Merge pull request #29 from Rhinemann/lab4/yushchenko-SCRUM-81-DataProcessing-implementation
[L4] SCRUM-81: implementation data processing function
2026-03-25 11:11:31 +00:00
hasslesstech 0695e3d092 [P] Use state machine approach to determine road condition 2026-03-25 12:16:34 +02:00
AndriiJushchenko d6e094e6c0 fix: delay logic in DP 2026-03-25 12:16:34 +02:00
AndriiJushchenko 2167eb2960 DP implementation + delay 2026-03-25 12:16:34 +02:00
VladiusVostokus 38374a6723 Merge pull request #28 from Rhinemann/lab4/shmuliar-FIX-01-wrong-acceleration-data-types
[P] Fix acceleration data types
2026-03-25 10:14:10 +00:00
hasslesstech c08612f71a [P] Fix acceleration data types 2026-03-25 10:10:29 +02:00
hasslesstech bde51ca5e1 [P] Fix Store -> MapView websocket incompatibility
Component testing / Hub testing (push) Successful in 21s
Component testing / Store testing (push) Successful in 20s
Component testing / Integration smoke testing (push) Successful in 2m19s
2026-03-24 16:57:29 +02:00
hasslesstech a204bb1676 [P] Split rows into websocket-sendable messages 2026-03-24 16:38:16 +02:00
hasslesstech 764fb77f27 Merge from upstream 'github/dev' into dev
Component testing / Hub testing (push) Successful in 21s
Component testing / Store testing (push) Successful in 18s
Component testing / Integration smoke testing (push) Successful in 2m14s
2026-03-24 15:21:56 +02:00
VladiusVostokus a55fc17711 Merge pull request #27 from Rhinemann/lab4/hrynko-SCRUM-80-hubadapter
implement Edge-Hub integration with user_id validation (SCRUM-93, SCRUM-94)
2026-03-24 13:20:39 +00:00
hasslesstech b34e385128 [L4] Remove irrelevant environment variables 2026-03-24 14:42:13 +02:00
hasslesstech a8a0ef5e15 [L4] Remove unused import 2026-03-24 14:09:53 +02:00
hasslesstech 00b037a243 [L4] Remove uncessecary environment variable left over after print -> logging usage switch 2026-03-24 14:07:49 +02:00
hasslesstech d1b6c0eed1 [L4] Fix logging level on error message 2026-03-24 14:03:29 +02:00
hasslesstech 5e890d4f03 [L4] Remove obvious single code line comments to reduce risk of misleading comments in the future 2026-03-24 14:02:03 +02:00
hasslesstech a8e50d0386 [L4] Remove excessive library import and clean up edge codebase 2026-03-24 13:58:32 +02:00
hasslesstech 1b42be264d [L4] Remove misleading batch_size parameter from AgentMQTTAdapter 2026-03-24 13:37:33 +02:00
esk4nz b12bdc334c fix: improve logging clarity and ensure data delivery in AgentMQTTAdapter 2026-03-23 23:41:25 +02:00
esk4nz e8ff1c6cbd Replaced busy-wait loop with threading.Event to fix 100% CPU load 2026-03-23 23:28:17 +02:00
esk4nz ad70519f47 SCRUM-80
Removed manual user_id fallback in data processing
2026-03-23 23:00:19 +02:00
esk4nz b10aec1020 SCRUM-80
Changed print to logging in agent main
2026-03-23 22:57:15 +02:00
esk4nz c085a49c8c implement Edge-Hub integration with user_id validation (SCRUM-93, SCRUM-94)
- Agent: Updated config and main
- Edge: Implemented adapter factory in main.py to switch between MQTT and HTTP.
- Edge: Updated AgentData entity and processing logic to support user_id.
- Infrastructure: Configured docker-compose for dynamic protocol switching and environment management.
2026-03-23 21:31:31 +02:00
18 changed files with 121 additions and 101 deletions
+2
View File
@@ -2,3 +2,5 @@ import os
STORE_HOST = os.environ.get("STORE_HOST") or "localhost" STORE_HOST = os.environ.get("STORE_HOST") or "localhost"
STORE_PORT = os.environ.get("STORE_PORT") or 8000 STORE_PORT = os.environ.get("STORE_PORT") or 8000
TRACK_ID = int(os.environ.get("TID") or '1')
+1 -1
View File
@@ -72,8 +72,8 @@ class Datasource:
) )
new_points = [ new_points = [
( (
processed_agent_data.latitude,
processed_agent_data.longitude, processed_agent_data.longitude,
processed_agent_data.latitude,
processed_agent_data.road_state, processed_agent_data.road_state,
processed_agent_data.user_id processed_agent_data.user_id
) )
+3 -3
View File
@@ -2,6 +2,6 @@ from dataclasses import dataclass
@dataclass @dataclass
class Accelerometer: class Accelerometer:
x: int x: float
y: int y: float
z: int z: float
+4 -2
View File
@@ -4,6 +4,7 @@ from kivy_garden.mapview import MapMarker, MapView
from kivy.clock import Clock from kivy.clock import Clock
from lineMapLayer import LineMapLayer from lineMapLayer import LineMapLayer
from datasource import Datasource from datasource import Datasource
import config
line_layer_colors = [ line_layer_colors = [
[1, 0, 0, 1], [1, 0, 0, 1],
@@ -32,7 +33,7 @@ class MapViewApp(App):
Встановлює необхідні маркери, викликає функцію для оновлення мапи Встановлює необхідні маркери, викликає функцію для оновлення мапи
""" """
self.update() self.update()
Clock.schedule_interval(self.update, 5) Clock.schedule_interval(self.update, 0.1)
def update(self, *args): def update(self, *args):
""" """
@@ -87,7 +88,8 @@ class MapViewApp(App):
self.car_markers[user_id].lat = lat self.car_markers[user_id].lat = lat
self.car_markers[user_id].lon = lon self.car_markers[user_id].lon = lon
self.mapview.center_on(lat, lon) if user_id == config.TRACK_ID:
self.mapview.center_on(lat, lon)
def set_pothole_marker(self, point): def set_pothole_marker(self, point):
if isinstance(point, dict): if isinstance(point, dict):
+1 -1
View File
@@ -8,7 +8,7 @@ def try_parse(type, value: str):
return None return None
USER_ID = 1 USER_ID = try_parse(int, os.environ.get("USER_ID")) or 1
# MQTT config # MQTT config
MQTT_BROKER_HOST = os.environ.get("MQTT_BROKER_HOST") or "mqtt" MQTT_BROKER_HOST = os.environ.get("MQTT_BROKER_HOST") or "mqtt"
MQTT_BROKER_PORT = try_parse(int, os.environ.get("MQTT_BROKER_PORT")) or 1883 MQTT_BROKER_PORT = try_parse(int, os.environ.get("MQTT_BROKER_PORT")) or 1883
+3 -3
View File
@@ -3,6 +3,6 @@ from dataclasses import dataclass
@dataclass @dataclass
class Accelerometer: class Accelerometer:
x: int x: float
y: int y: float
z: int z: float
+7 -5
View File
@@ -15,6 +15,7 @@ class FileDatasource:
def __init__( def __init__(
self, self,
acc_divisor: float,
accelerometer_filename: str, accelerometer_filename: str,
gps_filename: str, gps_filename: str,
park_filename: str, park_filename: str,
@@ -34,6 +35,8 @@ class FileDatasource:
self._started = False self._started = False
self.acc_divisor = acc_divisor
def startReading(self, *args, **kwargs): def startReading(self, *args, **kwargs):
"""Must be called before read()""" """Must be called before read()"""
if self._started: if self._started:
@@ -160,15 +163,14 @@ class FileDatasource:
return row return row
@staticmethod def _parse_acc(self, row: List[str]) -> Accelerometer:
def _parse_acc(row: List[str]) -> Accelerometer:
if len(row) < 3: if len(row) < 3:
raise ValueError(f"Accelerometer row must have 3 values (x,y,z). Got: {row}") raise ValueError(f"Accelerometer row must have 3 values (x,y,z). Got: {row}")
try: try:
x = int(row[0]) x = int(row[0]) / self.acc_divisor
y = int(row[1]) y = int(row[1]) / self.acc_divisor
z = int(row[2]) z = int(row[2]) / self.acc_divisor
except ValueError as e: except ValueError as e:
raise ValueError(f"Invalid accelerometer values (expected integers): {row}") from e raise ValueError(f"Invalid accelerometer values (expected integers): {row}") from e
+7 -2
View File
@@ -1,6 +1,7 @@
from paho.mqtt import client as mqtt_client from paho.mqtt import client as mqtt_client
from schema.aggregated_data_schema import AggregatedDataSchema from schema.aggregated_data_schema import AggregatedDataSchema
from file_datasource import FileDatasource from file_datasource import FileDatasource
import logging
import config import config
@@ -15,6 +16,8 @@ def connect_mqtt(broker, port):
print("Failed to connect {broker}:{port}, return code %d\n", rc) print("Failed to connect {broker}:{port}, return code %d\n", rc)
exit(rc) # Stop execution exit(rc) # Stop execution
logging.info(f"Acting as USER_ID = {config.USER_ID}")
client = mqtt_client.Client() client = mqtt_client.Client()
client.on_connect = on_connect client.on_connect = on_connect
client.connect(broker, port) client.connect(broker, port)
@@ -28,16 +31,18 @@ def publish(client, topic, datasource):
data = datasource.read() data = datasource.read()
msg = AggregatedDataSchema().dumps(data) msg = AggregatedDataSchema().dumps(data)
result = client.publish(topic, msg) result = client.publish(topic, msg)
logging.debug(f"Published to {topic}: {msg[:50]}...")
status = result[0] status = result[0]
if status != 0: if status != 0:
print(f"Failed to send message to topic {topic}") logging.error(f"Failed to send message to topic {topic}")
def run(): def run():
logging.basicConfig(level = logging.INFO)
# Prepare mqtt client # Prepare mqtt client
client = connect_mqtt(config.MQTT_BROKER_HOST, config.MQTT_BROKER_PORT) client = connect_mqtt(config.MQTT_BROKER_HOST, config.MQTT_BROKER_PORT)
# Prepare datasource # Prepare datasource
datasource = FileDatasource("data/accelerometer.csv", "data/gps.csv", "data/parking.csv") datasource = FileDatasource(16384.0, "data/accelerometer.csv", "data/gps.csv", "data/parking.csv")
# Infinity publish data # Infinity publish data
publish(client, config.MQTT_TOPIC, datasource) publish(client, config.MQTT_TOPIC, datasource)
+3 -3
View File
@@ -2,6 +2,6 @@ from marshmallow import Schema, fields
class AccelerometerSchema(Schema): class AccelerometerSchema(Schema):
x = fields.Int() x = fields.Float()
y = fields.Int() y = fields.Float()
z = fields.Int() z = fields.Float()
+6 -2
View File
@@ -22,6 +22,7 @@ services:
depends_on: depends_on:
- mqtt - mqtt
environment: environment:
PYTHONUNBUFFERED: 1
MQTT_BROKER_HOST: "mqtt" MQTT_BROKER_HOST: "mqtt"
MQTT_BROKER_PORT: 1883 MQTT_BROKER_PORT: 1883
MQTT_TOPIC: "agent_data_topic" MQTT_TOPIC: "agent_data_topic"
@@ -39,9 +40,10 @@ services:
environment: environment:
MQTT_BROKER_HOST: "mqtt" MQTT_BROKER_HOST: "mqtt"
MQTT_BROKER_PORT: 1883 MQTT_BROKER_PORT: 1883
MQTT_TOPIC: " " MQTT_TOPIC: "agent_data_topic"
HUB_HOST: "store" HUB_HOST: "hub"
HUB_PORT: 8000 HUB_PORT: 8000
HUB_CONNECTION_TYPE: "http"
HUB_MQTT_BROKER_HOST: "mqtt" HUB_MQTT_BROKER_HOST: "mqtt"
HUB_MQTT_BROKER_PORT: 1883 HUB_MQTT_BROKER_PORT: 1883
HUB_MQTT_TOPIC: "processed_data_topic" HUB_MQTT_TOPIC: "processed_data_topic"
@@ -89,6 +91,7 @@ services:
- postgres_db - postgres_db
restart: always restart: always
environment: environment:
PYTHONUNBUFFERED: 1
POSTGRES_USER: user POSTGRES_USER: user
POSTGRES_PASSWORD: pass POSTGRES_PASSWORD: pass
POSTGRES_DB: test_db POSTGRES_DB: test_db
@@ -120,6 +123,7 @@ services:
- redis - redis
- store - store
environment: environment:
PYTHONUNBUFFERED: 1
STORE_API_HOST: "store" STORE_API_HOST: "store"
STORE_API_PORT: 8000 STORE_API_PORT: 8000
REDIS_HOST: "redis" REDIS_HOST: "redis"
+9 -32
View File
@@ -13,9 +13,7 @@ class AgentMQTTAdapter(AgentGateway):
broker_port, broker_port,
topic, topic,
hub_gateway: HubGateway, hub_gateway: HubGateway,
batch_size=10,
): ):
self.batch_size = batch_size
# MQTT # MQTT
self.broker_host = broker_host self.broker_host = broker_host
self.broker_port = broker_port self.broker_port = broker_port
@@ -35,42 +33,21 @@ class AgentMQTTAdapter(AgentGateway):
"""Processing agent data and sent it to hub gateway""" """Processing agent data and sent it to hub gateway"""
try: try:
payload: str = msg.payload.decode("utf-8") payload: str = msg.payload.decode("utf-8")
# Create AgentData instance with the received data
agent_data = AgentData.model_validate_json(payload, strict=True) agent_data = AgentData.model_validate_json(payload, strict=True)
# Process the received data (you can call a use case here if needed)
processed_data = process_agent_data(agent_data) processed_data = process_agent_data(agent_data)
# Store the agent_data in the database (you can send it to the data processing module)
if not self.hub_gateway.save_data(processed_data): if self.hub_gateway.save_data(processed_data):
logging.error("Hub is not available") logging.info("Processed data successfully forwarded to the Hub.")
else:
logging.error("Failed to send data: Hub gateway is unavailable.")
except Exception as e: except Exception as e:
logging.info(f"Error processing MQTT message: {e}") logging.error(f"Error processing MQTT message: {e}")
def connect(self): def connect(self):
self.client.on_connect = self.on_connect self.client.on_connect = self.on_connect
self.client.on_message = self.on_message self.client.on_message = self.on_message
self.client.connect(self.broker_host, self.broker_port, 60) self.client.connect(self.broker_host, self.broker_port, 60)
def start(self): def loop_forever(self):
self.client.loop_start() self.client.loop_forever()
def stop(self):
self.client.loop_stop()
# Usage example:
if __name__ == "__main__":
broker_host = "localhost"
broker_port = 1883
topic = "agent_data_topic"
# Assuming you have implemented the StoreGateway and passed it to the adapter
store_gateway = HubGateway()
adapter = AgentMQTTAdapter(broker_host, broker_port, topic, store_gateway)
adapter.connect()
adapter.start()
try:
# Keep the adapter running in the background
while True:
pass
except KeyboardInterrupt:
adapter.stop()
logging.info("Adapter stopped.")
+1
View File
@@ -14,6 +14,7 @@ class GpsData(BaseModel):
class AgentData(BaseModel): class AgentData(BaseModel):
user_id: int
accelerometer: AccelerometerData accelerometer: AccelerometerData
gps: GpsData gps: GpsData
timestamp: datetime timestamp: datetime
+2 -9
View File
@@ -26,15 +26,8 @@ class AgentGateway(ABC):
pass pass
@abstractmethod @abstractmethod
def start(self): def loop_forever(self):
""" """
Method to start listening for messages from the agent. Method to await for new messages.
"""
pass
@abstractmethod
def stop(self):
"""
Method to stop the agent gateway and clean up resources.
""" """
pass pass
+22 -1
View File
@@ -1,6 +1,7 @@
from app.entities.agent_data import AgentData from app.entities.agent_data import AgentData
from app.entities.processed_agent_data import ProcessedAgentData from app.entities.processed_agent_data import ProcessedAgentData
_last_detection_state = {}
def process_agent_data( def process_agent_data(
agent_data: AgentData, agent_data: AgentData,
@@ -12,4 +13,24 @@ def process_agent_data(
Returns: Returns:
processed_data_batch (ProcessedAgentData): Processed data containing the classified state of the road surface and agent data. processed_data_batch (ProcessedAgentData): Processed data containing the classified state of the road surface and agent data.
""" """
# Implement it user_id = agent_data.user_id
road_state = "normal"
last_detection_state = _last_detection_state.get(user_id, False)
if (agent_data.accelerometer.z < 0.6):
road_state = "pothole"
elif (agent_data.accelerometer.z > 1.2):
road_state = "bump"
detection_happened = road_state != "normal"
if not (not last_detection_state and detection_happened):
road_state = "normal"
_last_detection_state[user_id] = detection_happened
return ProcessedAgentData(
road_state=road_state,
agent_data=agent_data
)
+5 -2
View File
@@ -16,9 +16,12 @@ MQTT_TOPIC = os.environ.get("MQTT_TOPIC") or "agent_data_topic"
# Configuration for hub MQTT # Configuration for hub MQTT
HUB_MQTT_BROKER_HOST = os.environ.get("HUB_MQTT_BROKER_HOST") or "localhost" HUB_MQTT_BROKER_HOST = os.environ.get("HUB_MQTT_BROKER_HOST") or "localhost"
HUB_MQTT_BROKER_PORT = try_parse_int(os.environ.get("HUB_MQTT_BROKER_PORT")) or 1883 HUB_MQTT_BROKER_PORT = try_parse_int(os.environ.get("HUB_MQTT_BROKER_PORT")) or 1883
HUB_MQTT_TOPIC = os.environ.get("HUB_MQTT_TOPIC") or "processed_agent_data_topic" HUB_MQTT_TOPIC = os.environ.get("HUB_MQTT_TOPIC") or "processed_data_topic"
# Configuration for the Hub # Configuration for the Hub
HUB_HOST = os.environ.get("HUB_HOST") or "localhost" HUB_HOST = os.environ.get("HUB_HOST") or "localhost"
HUB_PORT = try_parse_int(os.environ.get("HUB_PORT")) or 12000 HUB_PORT = try_parse_int(os.environ.get("HUB_PORT")) or 8000
HUB_URL = f"http://{HUB_HOST}:{HUB_PORT}" HUB_URL = f"http://{HUB_HOST}:{HUB_PORT}"
# For choosing type of connection
HUB_CONNECTION_TYPE = os.environ.get("HUB_CONNECTION_TYPE") or "mqtt"
+30 -21
View File
@@ -10,42 +10,51 @@ from config import (
HUB_MQTT_BROKER_HOST, HUB_MQTT_BROKER_HOST,
HUB_MQTT_BROKER_PORT, HUB_MQTT_BROKER_PORT,
HUB_MQTT_TOPIC, HUB_MQTT_TOPIC,
HUB_CONNECTION_TYPE,
) )
if __name__ == "__main__": if __name__ == "__main__":
# Configure logging settings # Configure logging settings
logging.basicConfig( logging.basicConfig(
level=logging.INFO, # Set the log level to INFO (you can use logging.DEBUG for more detailed logs) level=logging.INFO,
format="[%(asctime)s] [%(levelname)s] [%(module)s] %(message)s", format="[%(asctime)s] [%(levelname)s] [%(module)s] %(message)s",
handlers=[ handlers=[
logging.StreamHandler(), # Output log messages to the console logging.StreamHandler(),
logging.FileHandler("app.log"), # Save log messages to a file logging.FileHandler("app.log"),
], ],
) )
# Create an instance of the StoreApiAdapter using the configuration
# hub_adapter = HubHttpAdapter( # Logic to select the adapter based on configuration (SCRUM-93 & SCRUM-94)
# api_base_url=HUB_URL, # This allows easy switching between HTTP and MQTT protocols
# ) if HUB_CONNECTION_TYPE.lower() == "http":
hub_adapter = HubMqttAdapter( logging.info("Initializing HubHttpAdapter (SCRUM-93 integration)")
broker=HUB_MQTT_BROKER_HOST, hub_adapter = HubHttpAdapter(
port=HUB_MQTT_BROKER_PORT, api_base_url=HUB_URL,
topic=HUB_MQTT_TOPIC, )
) else:
# Create an instance of the AgentMQTTAdapter using the configuration logging.info("Initializing HubMqttAdapter (SCRUM-94 integration)")
hub_adapter = HubMqttAdapter(
broker=HUB_MQTT_BROKER_HOST,
port=HUB_MQTT_BROKER_PORT,
topic=HUB_MQTT_TOPIC,
)
# Create an instance of the AgentMQTTAdapter using the selected hub adapter
# This adapter acts as a bridge between the Agent and the Hub
agent_adapter = AgentMQTTAdapter( agent_adapter = AgentMQTTAdapter(
broker_host=MQTT_BROKER_HOST, broker_host=MQTT_BROKER_HOST,
broker_port=MQTT_BROKER_PORT, broker_port=MQTT_BROKER_PORT,
topic=MQTT_TOPIC, topic=MQTT_TOPIC,
hub_gateway=hub_adapter, hub_gateway=hub_adapter,
) )
try: try:
# Connect to the MQTT broker and start listening for messages logging.info(f"Connecting to MQTT broker at {MQTT_BROKER_HOST}:{MQTT_BROKER_PORT}")
agent_adapter.connect() agent_adapter.connect()
agent_adapter.start()
# Keep the system running indefinitely (you can add other logic as needed) logging.info("Broker connection success. Waiting for data...")
while True: agent_adapter.loop_forever()
pass
except KeyboardInterrupt: except KeyboardInterrupt:
# Stop the MQTT adapter and exit gracefully if interrupted by the user logging.info("Interrupt signal received. Shutting down...")
agent_adapter.stop() agent_adapter.disconnect()
logging.info("System stopped.") logging.info("Disconnected from MQTT broker.")
+11 -14
View File
@@ -33,7 +33,7 @@ processed_agent_data = Table(
) )
# WebSocket subscriptions # WebSocket subscriptions
subscriptions: Dict[int, Set[WebSocket]] = {} subscriptions: Set[WebSocket] = set()
# FastAPI WebSocket endpoint # FastAPI WebSocket endpoint
@@ -41,34 +41,31 @@ subscriptions: Dict[int, Set[WebSocket]] = {}
async def websocket_endpoint(websocket: WebSocket, user_id: int): async def websocket_endpoint(websocket: WebSocket, user_id: int):
await websocket.accept() await websocket.accept()
if user_id not in subscriptions: subscriptions.add(websocket)
subscriptions[user_id] = set()
subscriptions[user_id].add(websocket)
try: try:
# send already available data # send already available data
r = processed_agent_data.select() r = processed_agent_data.select().order_by(processed_agent_data.c.timestamp)
stored_data = SessionLocal().execute(r).fetchall() stored_data = SessionLocal().execute(r).fetchall()
jsonable_data = [{c.name: getattr(i, c.name) for c in processed_agent_data.columns} for i in stored_data] jsonable_data = [{c.name: getattr(i, c.name) for c in processed_agent_data.columns} for i in stored_data]
for i in jsonable_data: for i in jsonable_data:
i['timestamp'] = i['timestamp'].strftime("%Y-%m-%dT%H:%M:%SZ") i['timestamp'] = i['timestamp'].strftime("%Y-%m-%dT%H:%M:%SZ")
await websocket.send_json(json.dumps(jsonable_data)) for i in jsonable_data:
await websocket.send_json(json.dumps([i]))
# receive forever # receive forever
while True: while True:
await websocket.receive_text() await websocket.receive_text()
except WebSocketDisconnect: except WebSocketDisconnect:
subscriptions[user_id].remove(websocket) subscriptions.remove(websocket)
# Function to send data to subscribed users # Function to send data to subscribed users
async def send_data_to_subscribers(user_id: int, data): async def send_data_to_subscribers(data):
if user_id in subscriptions: for websocket in subscriptions:
for websocket in subscriptions[user_id]: await websocket.send_json(json.dumps([data]))
await websocket.send_json(json.dumps(data))
# FastAPI CRUDL endpoints # FastAPI CRUDL endpoints
@@ -99,8 +96,8 @@ async def create_processed_agent_data(data: List[ProcessedAgentData], user_id: i
created_records = [dict(row._mapping) for row in result.fetchall()] created_records = [dict(row._mapping) for row in result.fetchall()]
session.commit() session.commit()
for record in created_records: for record in sorted(created_records, key = lambda x: x['timestamp']):
await send_data_to_subscribers(user_id, jsonable_encoder(record)) await send_data_to_subscribers(jsonable_encoder(record))
return created_records return created_records
except Exception as err: except Exception as err:
session.rollback() session.rollback()
+4
View File
@@ -1,4 +1,5 @@
import sys import sys
import os
print("Checking for dead containers...") print("Checking for dead containers...")
@@ -14,6 +15,9 @@ for i in statuses:
if not i[status_index:].startswith("Up "): if not i[status_index:].startswith("Up "):
service_name = i[name_index:] service_name = i[name_index:]
print(f"Crash detected in {service_name}") print(f"Crash detected in {service_name}")
print(f"docker logs for the container:\n")
os.system(f"docker logs {i.split(' ')[0]}")
print()
exit_code = 1 exit_code = 1
sys.exit(exit_code) sys.exit(exit_code)