Compare commits
6 Commits
project/sh
...
1df1c6bd83
| Author | SHA1 | Date | |
|---|---|---|---|
| 1df1c6bd83 | |||
| 8160f030cd | |||
| a5dcf09bf2 | |||
| 8548714537 | |||
| 942ce98c4b | |||
| d4e369d5f8 |
@@ -2,5 +2,3 @@ import os
|
||||
|
||||
STORE_HOST = os.environ.get("STORE_HOST") or "localhost"
|
||||
STORE_PORT = os.environ.get("STORE_PORT") or 8000
|
||||
|
||||
TRACK_ID = int(os.environ.get("TID") or '1')
|
||||
|
||||
@@ -72,8 +72,8 @@ class Datasource:
|
||||
)
|
||||
new_points = [
|
||||
(
|
||||
processed_agent_data.longitude,
|
||||
processed_agent_data.latitude,
|
||||
processed_agent_data.longitude,
|
||||
processed_agent_data.road_state,
|
||||
processed_agent_data.user_id
|
||||
)
|
||||
|
||||
@@ -2,6 +2,6 @@ from dataclasses import dataclass
|
||||
|
||||
@dataclass
|
||||
class Accelerometer:
|
||||
x: float
|
||||
y: float
|
||||
z: float
|
||||
x: int
|
||||
y: int
|
||||
z: int
|
||||
|
||||
@@ -4,7 +4,6 @@ from kivy_garden.mapview import MapMarker, MapView
|
||||
from kivy.clock import Clock
|
||||
from lineMapLayer import LineMapLayer
|
||||
from datasource import Datasource
|
||||
import config
|
||||
|
||||
line_layer_colors = [
|
||||
[1, 0, 0, 1],
|
||||
@@ -33,7 +32,7 @@ class MapViewApp(App):
|
||||
Встановлює необхідні маркери, викликає функцію для оновлення мапи
|
||||
"""
|
||||
self.update()
|
||||
Clock.schedule_interval(self.update, 0.1)
|
||||
Clock.schedule_interval(self.update, 5)
|
||||
|
||||
def update(self, *args):
|
||||
"""
|
||||
@@ -88,8 +87,7 @@ class MapViewApp(App):
|
||||
self.car_markers[user_id].lat = lat
|
||||
self.car_markers[user_id].lon = lon
|
||||
|
||||
if user_id == config.TRACK_ID:
|
||||
self.mapview.center_on(lat, lon)
|
||||
self.mapview.center_on(lat, lon)
|
||||
|
||||
def set_pothole_marker(self, point):
|
||||
if isinstance(point, dict):
|
||||
|
||||
@@ -3,6 +3,6 @@ from dataclasses import dataclass
|
||||
|
||||
@dataclass
|
||||
class Accelerometer:
|
||||
x: float
|
||||
y: float
|
||||
z: float
|
||||
x: int
|
||||
y: int
|
||||
z: int
|
||||
|
||||
@@ -15,7 +15,6 @@ class FileDatasource:
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
acc_divisor: float,
|
||||
accelerometer_filename: str,
|
||||
gps_filename: str,
|
||||
park_filename: str,
|
||||
@@ -35,8 +34,6 @@ class FileDatasource:
|
||||
|
||||
self._started = False
|
||||
|
||||
self.acc_divisor = acc_divisor
|
||||
|
||||
def startReading(self, *args, **kwargs):
|
||||
"""Must be called before read()"""
|
||||
if self._started:
|
||||
@@ -163,14 +160,15 @@ class FileDatasource:
|
||||
|
||||
return row
|
||||
|
||||
def _parse_acc(self, row: List[str]) -> Accelerometer:
|
||||
@staticmethod
|
||||
def _parse_acc(row: List[str]) -> Accelerometer:
|
||||
if len(row) < 3:
|
||||
raise ValueError(f"Accelerometer row must have 3 values (x,y,z). Got: {row}")
|
||||
|
||||
try:
|
||||
x = int(row[0]) / self.acc_divisor
|
||||
y = int(row[1]) / self.acc_divisor
|
||||
z = int(row[2]) / self.acc_divisor
|
||||
x = int(row[0])
|
||||
y = int(row[1])
|
||||
z = int(row[2])
|
||||
except ValueError as e:
|
||||
raise ValueError(f"Invalid accelerometer values (expected integers): {row}") from e
|
||||
|
||||
|
||||
@@ -16,8 +16,6 @@ def connect_mqtt(broker, port):
|
||||
print("Failed to connect {broker}:{port}, return code %d\n", rc)
|
||||
exit(rc) # Stop execution
|
||||
|
||||
logging.info(f"Acting as USER_ID = {config.USER_ID}")
|
||||
|
||||
client = mqtt_client.Client()
|
||||
client.on_connect = on_connect
|
||||
client.connect(broker, port)
|
||||
@@ -31,18 +29,17 @@ def publish(client, topic, datasource):
|
||||
data = datasource.read()
|
||||
msg = AggregatedDataSchema().dumps(data)
|
||||
result = client.publish(topic, msg)
|
||||
logging.debug(f"Published to {topic}: {msg[:50]}...")
|
||||
logging.info(f"Published to {topic}: {msg[:50]}...")
|
||||
status = result[0]
|
||||
if status != 0:
|
||||
logging.error(f"Failed to send message to topic {topic}")
|
||||
print(f"Failed to send message to topic {topic}")
|
||||
|
||||
|
||||
def run():
|
||||
logging.basicConfig(level = logging.INFO)
|
||||
# Prepare mqtt client
|
||||
client = connect_mqtt(config.MQTT_BROKER_HOST, config.MQTT_BROKER_PORT)
|
||||
# Prepare datasource
|
||||
datasource = FileDatasource(16384.0, "data/accelerometer.csv", "data/gps.csv", "data/parking.csv")
|
||||
datasource = FileDatasource("data/accelerometer.csv", "data/gps.csv", "data/parking.csv")
|
||||
# Infinity publish data
|
||||
publish(client, config.MQTT_TOPIC, datasource)
|
||||
|
||||
|
||||
@@ -2,6 +2,6 @@ from marshmallow import Schema, fields
|
||||
|
||||
|
||||
class AccelerometerSchema(Schema):
|
||||
x = fields.Float()
|
||||
y = fields.Float()
|
||||
z = fields.Float()
|
||||
x = fields.Int()
|
||||
y = fields.Int()
|
||||
z = fields.Int()
|
||||
|
||||
19
diagrams/general-sequence.puml
Normal file
19
diagrams/general-sequence.puml
Normal file
@@ -0,0 +1,19 @@
|
||||
@startuml
|
||||
actor Worker as worker
|
||||
participant Agent as agent
|
||||
participant "Edge" as edge
|
||||
participant "Hub" as hub
|
||||
participant "Store" as store
|
||||
participant "MapView" as mapview
|
||||
actor Spectator as spectator
|
||||
|
||||
worker -> agent : Start agent
|
||||
agent -> edge : Send sensor data
|
||||
edge -> edge : Classify road state
|
||||
edge -> hub : Send processed data
|
||||
hub -> hub : Save processed data
|
||||
hub -> store : Send processed data batch
|
||||
store -> store : Save processed data batch
|
||||
store -> mapview : Send new rows
|
||||
mapview -> spectator : Update paths and markers
|
||||
@enduml
|
||||
15
diagrams/main-activity-diagram.puml
Normal file
15
diagrams/main-activity-diagram.puml
Normal file
@@ -0,0 +1,15 @@
|
||||
@startuml
|
||||
|Користувач|
|
||||
start
|
||||
:Натискає на іконку ями;
|
||||
|Система|
|
||||
:Запитує підтвердження видалення ями;
|
||||
|Користувач|
|
||||
:Підтверджує видалення ями;
|
||||
|Система|
|
||||
:Модифікує запис про яму;
|
||||
note right #ff9999: Можлива виключна ситуація 1
|
||||
:Видаляє яму;
|
||||
|Користувач|
|
||||
stop
|
||||
@enduml
|
||||
23
diagrams/use-case.puml
Normal file
23
diagrams/use-case.puml
Normal file
@@ -0,0 +1,23 @@
|
||||
@startuml
|
||||
rectangle IoT-Systems {
|
||||
usecase "Collect telemetry (accelerometer + GPS)" as uc1
|
||||
usecase "Send telemetry" as uc2
|
||||
usecase "Process telemetry" as uc3
|
||||
usecase "Determine road condition (pothole / bump /\nnormal)" as uc4
|
||||
usecase "View road defect marks" as uc5
|
||||
usecase "View route on map" as uc6
|
||||
}
|
||||
|
||||
rectangle "The user is the card operator" as uc10
|
||||
rectangle "Sensor Agent\n(Device/STM32/Emulator)" as uc11
|
||||
|
||||
uc11 - uc1
|
||||
uc11 - uc2
|
||||
|
||||
uc10 - uc5
|
||||
uc10 - uc6
|
||||
|
||||
uc2 -.|> uc3 : <<include>>
|
||||
uc3 -.|> uc4 : <<include>>
|
||||
|
||||
@enduml
|
||||
@@ -1,7 +1,6 @@
|
||||
from app.entities.agent_data import AgentData
|
||||
from app.entities.processed_agent_data import ProcessedAgentData
|
||||
|
||||
_last_detection_state = {}
|
||||
|
||||
def process_agent_data(
|
||||
agent_data: AgentData,
|
||||
@@ -13,24 +12,8 @@ def process_agent_data(
|
||||
Returns:
|
||||
processed_data_batch (ProcessedAgentData): Processed data containing the classified state of the road surface and agent data.
|
||||
"""
|
||||
user_id = agent_data.user_id
|
||||
road_state = "normal"
|
||||
|
||||
last_detection_state = _last_detection_state.get(user_id, False)
|
||||
|
||||
if (agent_data.accelerometer.z < 0.6):
|
||||
road_state = "pothole"
|
||||
elif (agent_data.accelerometer.z > 1.2):
|
||||
road_state = "bump"
|
||||
|
||||
detection_happened = road_state != "normal"
|
||||
|
||||
if not (not last_detection_state and detection_happened):
|
||||
road_state = "normal"
|
||||
|
||||
_last_detection_state[user_id] = detection_happened
|
||||
|
||||
# Implement it
|
||||
return ProcessedAgentData(
|
||||
road_state=road_state,
|
||||
road_state="normal",
|
||||
agent_data=agent_data
|
||||
)
|
||||
|
||||
@@ -33,7 +33,7 @@ processed_agent_data = Table(
|
||||
)
|
||||
|
||||
# WebSocket subscriptions
|
||||
subscriptions: Set[WebSocket] = set()
|
||||
subscriptions: Dict[int, Set[WebSocket]] = {}
|
||||
|
||||
|
||||
# FastAPI WebSocket endpoint
|
||||
@@ -41,31 +41,34 @@ subscriptions: Set[WebSocket] = set()
|
||||
async def websocket_endpoint(websocket: WebSocket, user_id: int):
|
||||
await websocket.accept()
|
||||
|
||||
subscriptions.add(websocket)
|
||||
if user_id not in subscriptions:
|
||||
subscriptions[user_id] = set()
|
||||
|
||||
subscriptions[user_id].add(websocket)
|
||||
|
||||
try:
|
||||
# send already available data
|
||||
r = processed_agent_data.select().order_by(processed_agent_data.c.timestamp)
|
||||
r = processed_agent_data.select()
|
||||
stored_data = SessionLocal().execute(r).fetchall()
|
||||
|
||||
jsonable_data = [{c.name: getattr(i, c.name) for c in processed_agent_data.columns} for i in stored_data]
|
||||
for i in jsonable_data:
|
||||
i['timestamp'] = i['timestamp'].strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||
|
||||
for i in jsonable_data:
|
||||
await websocket.send_json(json.dumps([i]))
|
||||
await websocket.send_json(json.dumps(jsonable_data))
|
||||
|
||||
# receive forever
|
||||
while True:
|
||||
await websocket.receive_text()
|
||||
except WebSocketDisconnect:
|
||||
subscriptions.remove(websocket)
|
||||
subscriptions[user_id].remove(websocket)
|
||||
|
||||
|
||||
# Function to send data to subscribed users
|
||||
async def send_data_to_subscribers(data):
|
||||
for websocket in subscriptions:
|
||||
await websocket.send_json(json.dumps([data]))
|
||||
async def send_data_to_subscribers(user_id: int, data):
|
||||
if user_id in subscriptions:
|
||||
for websocket in subscriptions[user_id]:
|
||||
await websocket.send_json(json.dumps(data))
|
||||
|
||||
|
||||
# FastAPI CRUDL endpoints
|
||||
@@ -96,8 +99,8 @@ async def create_processed_agent_data(data: List[ProcessedAgentData], user_id: i
|
||||
created_records = [dict(row._mapping) for row in result.fetchall()]
|
||||
session.commit()
|
||||
|
||||
for record in sorted(created_records, key = lambda x: x['timestamp']):
|
||||
await send_data_to_subscribers(jsonable_encoder(record))
|
||||
for record in created_records:
|
||||
await send_data_to_subscribers(user_id, jsonable_encoder(record))
|
||||
return created_records
|
||||
except Exception as err:
|
||||
session.rollback()
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
import sys
|
||||
import os
|
||||
|
||||
print("Checking for dead containers...")
|
||||
|
||||
@@ -15,9 +14,6 @@ for i in statuses:
|
||||
if not i[status_index:].startswith("Up "):
|
||||
service_name = i[name_index:]
|
||||
print(f"Crash detected in {service_name}")
|
||||
print(f"docker logs for the container:\n")
|
||||
os.system(f"docker logs {i.split(' ')[0]}")
|
||||
print()
|
||||
exit_code = 1
|
||||
|
||||
sys.exit(exit_code)
|
||||
|
||||
Reference in New Issue
Block a user