Compare commits
6 Commits
project/sh
...
1df1c6bd83
| Author | SHA1 | Date | |
|---|---|---|---|
| 1df1c6bd83 | |||
| 8160f030cd | |||
| a5dcf09bf2 | |||
| 8548714537 | |||
| 942ce98c4b | |||
| d4e369d5f8 |
@@ -2,5 +2,3 @@ import os
|
|||||||
|
|
||||||
STORE_HOST = os.environ.get("STORE_HOST") or "localhost"
|
STORE_HOST = os.environ.get("STORE_HOST") or "localhost"
|
||||||
STORE_PORT = os.environ.get("STORE_PORT") or 8000
|
STORE_PORT = os.environ.get("STORE_PORT") or 8000
|
||||||
|
|
||||||
TRACK_ID = int(os.environ.get("TID") or '1')
|
|
||||||
|
|||||||
@@ -72,8 +72,8 @@ class Datasource:
|
|||||||
)
|
)
|
||||||
new_points = [
|
new_points = [
|
||||||
(
|
(
|
||||||
processed_agent_data.longitude,
|
|
||||||
processed_agent_data.latitude,
|
processed_agent_data.latitude,
|
||||||
|
processed_agent_data.longitude,
|
||||||
processed_agent_data.road_state,
|
processed_agent_data.road_state,
|
||||||
processed_agent_data.user_id
|
processed_agent_data.user_id
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -2,6 +2,6 @@ from dataclasses import dataclass
|
|||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class Accelerometer:
|
class Accelerometer:
|
||||||
x: float
|
x: int
|
||||||
y: float
|
y: int
|
||||||
z: float
|
z: int
|
||||||
|
|||||||
@@ -4,7 +4,6 @@ from kivy_garden.mapview import MapMarker, MapView
|
|||||||
from kivy.clock import Clock
|
from kivy.clock import Clock
|
||||||
from lineMapLayer import LineMapLayer
|
from lineMapLayer import LineMapLayer
|
||||||
from datasource import Datasource
|
from datasource import Datasource
|
||||||
import config
|
|
||||||
|
|
||||||
line_layer_colors = [
|
line_layer_colors = [
|
||||||
[1, 0, 0, 1],
|
[1, 0, 0, 1],
|
||||||
@@ -33,7 +32,7 @@ class MapViewApp(App):
|
|||||||
Встановлює необхідні маркери, викликає функцію для оновлення мапи
|
Встановлює необхідні маркери, викликає функцію для оновлення мапи
|
||||||
"""
|
"""
|
||||||
self.update()
|
self.update()
|
||||||
Clock.schedule_interval(self.update, 0.1)
|
Clock.schedule_interval(self.update, 5)
|
||||||
|
|
||||||
def update(self, *args):
|
def update(self, *args):
|
||||||
"""
|
"""
|
||||||
@@ -88,7 +87,6 @@ class MapViewApp(App):
|
|||||||
self.car_markers[user_id].lat = lat
|
self.car_markers[user_id].lat = lat
|
||||||
self.car_markers[user_id].lon = lon
|
self.car_markers[user_id].lon = lon
|
||||||
|
|
||||||
if user_id == config.TRACK_ID:
|
|
||||||
self.mapview.center_on(lat, lon)
|
self.mapview.center_on(lat, lon)
|
||||||
|
|
||||||
def set_pothole_marker(self, point):
|
def set_pothole_marker(self, point):
|
||||||
|
|||||||
@@ -3,6 +3,6 @@ from dataclasses import dataclass
|
|||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class Accelerometer:
|
class Accelerometer:
|
||||||
x: float
|
x: int
|
||||||
y: float
|
y: int
|
||||||
z: float
|
z: int
|
||||||
|
|||||||
@@ -15,7 +15,6 @@ class FileDatasource:
|
|||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
acc_divisor: float,
|
|
||||||
accelerometer_filename: str,
|
accelerometer_filename: str,
|
||||||
gps_filename: str,
|
gps_filename: str,
|
||||||
park_filename: str,
|
park_filename: str,
|
||||||
@@ -35,8 +34,6 @@ class FileDatasource:
|
|||||||
|
|
||||||
self._started = False
|
self._started = False
|
||||||
|
|
||||||
self.acc_divisor = acc_divisor
|
|
||||||
|
|
||||||
def startReading(self, *args, **kwargs):
|
def startReading(self, *args, **kwargs):
|
||||||
"""Must be called before read()"""
|
"""Must be called before read()"""
|
||||||
if self._started:
|
if self._started:
|
||||||
@@ -163,14 +160,15 @@ class FileDatasource:
|
|||||||
|
|
||||||
return row
|
return row
|
||||||
|
|
||||||
def _parse_acc(self, row: List[str]) -> Accelerometer:
|
@staticmethod
|
||||||
|
def _parse_acc(row: List[str]) -> Accelerometer:
|
||||||
if len(row) < 3:
|
if len(row) < 3:
|
||||||
raise ValueError(f"Accelerometer row must have 3 values (x,y,z). Got: {row}")
|
raise ValueError(f"Accelerometer row must have 3 values (x,y,z). Got: {row}")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
x = int(row[0]) / self.acc_divisor
|
x = int(row[0])
|
||||||
y = int(row[1]) / self.acc_divisor
|
y = int(row[1])
|
||||||
z = int(row[2]) / self.acc_divisor
|
z = int(row[2])
|
||||||
except ValueError as e:
|
except ValueError as e:
|
||||||
raise ValueError(f"Invalid accelerometer values (expected integers): {row}") from e
|
raise ValueError(f"Invalid accelerometer values (expected integers): {row}") from e
|
||||||
|
|
||||||
|
|||||||
@@ -16,8 +16,6 @@ def connect_mqtt(broker, port):
|
|||||||
print("Failed to connect {broker}:{port}, return code %d\n", rc)
|
print("Failed to connect {broker}:{port}, return code %d\n", rc)
|
||||||
exit(rc) # Stop execution
|
exit(rc) # Stop execution
|
||||||
|
|
||||||
logging.info(f"Acting as USER_ID = {config.USER_ID}")
|
|
||||||
|
|
||||||
client = mqtt_client.Client()
|
client = mqtt_client.Client()
|
||||||
client.on_connect = on_connect
|
client.on_connect = on_connect
|
||||||
client.connect(broker, port)
|
client.connect(broker, port)
|
||||||
@@ -31,18 +29,17 @@ def publish(client, topic, datasource):
|
|||||||
data = datasource.read()
|
data = datasource.read()
|
||||||
msg = AggregatedDataSchema().dumps(data)
|
msg = AggregatedDataSchema().dumps(data)
|
||||||
result = client.publish(topic, msg)
|
result = client.publish(topic, msg)
|
||||||
logging.debug(f"Published to {topic}: {msg[:50]}...")
|
logging.info(f"Published to {topic}: {msg[:50]}...")
|
||||||
status = result[0]
|
status = result[0]
|
||||||
if status != 0:
|
if status != 0:
|
||||||
logging.error(f"Failed to send message to topic {topic}")
|
print(f"Failed to send message to topic {topic}")
|
||||||
|
|
||||||
|
|
||||||
def run():
|
def run():
|
||||||
logging.basicConfig(level = logging.INFO)
|
|
||||||
# Prepare mqtt client
|
# Prepare mqtt client
|
||||||
client = connect_mqtt(config.MQTT_BROKER_HOST, config.MQTT_BROKER_PORT)
|
client = connect_mqtt(config.MQTT_BROKER_HOST, config.MQTT_BROKER_PORT)
|
||||||
# Prepare datasource
|
# Prepare datasource
|
||||||
datasource = FileDatasource(16384.0, "data/accelerometer.csv", "data/gps.csv", "data/parking.csv")
|
datasource = FileDatasource("data/accelerometer.csv", "data/gps.csv", "data/parking.csv")
|
||||||
# Infinity publish data
|
# Infinity publish data
|
||||||
publish(client, config.MQTT_TOPIC, datasource)
|
publish(client, config.MQTT_TOPIC, datasource)
|
||||||
|
|
||||||
|
|||||||
@@ -2,6 +2,6 @@ from marshmallow import Schema, fields
|
|||||||
|
|
||||||
|
|
||||||
class AccelerometerSchema(Schema):
|
class AccelerometerSchema(Schema):
|
||||||
x = fields.Float()
|
x = fields.Int()
|
||||||
y = fields.Float()
|
y = fields.Int()
|
||||||
z = fields.Float()
|
z = fields.Int()
|
||||||
|
|||||||
19
diagrams/general-sequence.puml
Normal file
19
diagrams/general-sequence.puml
Normal file
@@ -0,0 +1,19 @@
|
|||||||
|
@startuml
|
||||||
|
actor Worker as worker
|
||||||
|
participant Agent as agent
|
||||||
|
participant "Edge" as edge
|
||||||
|
participant "Hub" as hub
|
||||||
|
participant "Store" as store
|
||||||
|
participant "MapView" as mapview
|
||||||
|
actor Spectator as spectator
|
||||||
|
|
||||||
|
worker -> agent : Start agent
|
||||||
|
agent -> edge : Send sensor data
|
||||||
|
edge -> edge : Classify road state
|
||||||
|
edge -> hub : Send processed data
|
||||||
|
hub -> hub : Save processed data
|
||||||
|
hub -> store : Send processed data batch
|
||||||
|
store -> store : Save processed data batch
|
||||||
|
store -> mapview : Send new rows
|
||||||
|
mapview -> spectator : Update paths and markers
|
||||||
|
@enduml
|
||||||
15
diagrams/main-activity-diagram.puml
Normal file
15
diagrams/main-activity-diagram.puml
Normal file
@@ -0,0 +1,15 @@
|
|||||||
|
@startuml
|
||||||
|
|Користувач|
|
||||||
|
start
|
||||||
|
:Натискає на іконку ями;
|
||||||
|
|Система|
|
||||||
|
:Запитує підтвердження видалення ями;
|
||||||
|
|Користувач|
|
||||||
|
:Підтверджує видалення ями;
|
||||||
|
|Система|
|
||||||
|
:Модифікує запис про яму;
|
||||||
|
note right #ff9999: Можлива виключна ситуація 1
|
||||||
|
:Видаляє яму;
|
||||||
|
|Користувач|
|
||||||
|
stop
|
||||||
|
@enduml
|
||||||
23
diagrams/use-case.puml
Normal file
23
diagrams/use-case.puml
Normal file
@@ -0,0 +1,23 @@
|
|||||||
|
@startuml
|
||||||
|
rectangle IoT-Systems {
|
||||||
|
usecase "Collect telemetry (accelerometer + GPS)" as uc1
|
||||||
|
usecase "Send telemetry" as uc2
|
||||||
|
usecase "Process telemetry" as uc3
|
||||||
|
usecase "Determine road condition (pothole / bump /\nnormal)" as uc4
|
||||||
|
usecase "View road defect marks" as uc5
|
||||||
|
usecase "View route on map" as uc6
|
||||||
|
}
|
||||||
|
|
||||||
|
rectangle "The user is the card operator" as uc10
|
||||||
|
rectangle "Sensor Agent\n(Device/STM32/Emulator)" as uc11
|
||||||
|
|
||||||
|
uc11 - uc1
|
||||||
|
uc11 - uc2
|
||||||
|
|
||||||
|
uc10 - uc5
|
||||||
|
uc10 - uc6
|
||||||
|
|
||||||
|
uc2 -.|> uc3 : <<include>>
|
||||||
|
uc3 -.|> uc4 : <<include>>
|
||||||
|
|
||||||
|
@enduml
|
||||||
@@ -1,7 +1,6 @@
|
|||||||
from app.entities.agent_data import AgentData
|
from app.entities.agent_data import AgentData
|
||||||
from app.entities.processed_agent_data import ProcessedAgentData
|
from app.entities.processed_agent_data import ProcessedAgentData
|
||||||
|
|
||||||
_last_detection_state = {}
|
|
||||||
|
|
||||||
def process_agent_data(
|
def process_agent_data(
|
||||||
agent_data: AgentData,
|
agent_data: AgentData,
|
||||||
@@ -13,24 +12,8 @@ def process_agent_data(
|
|||||||
Returns:
|
Returns:
|
||||||
processed_data_batch (ProcessedAgentData): Processed data containing the classified state of the road surface and agent data.
|
processed_data_batch (ProcessedAgentData): Processed data containing the classified state of the road surface and agent data.
|
||||||
"""
|
"""
|
||||||
user_id = agent_data.user_id
|
# Implement it
|
||||||
road_state = "normal"
|
|
||||||
|
|
||||||
last_detection_state = _last_detection_state.get(user_id, False)
|
|
||||||
|
|
||||||
if (agent_data.accelerometer.z < 0.6):
|
|
||||||
road_state = "pothole"
|
|
||||||
elif (agent_data.accelerometer.z > 1.2):
|
|
||||||
road_state = "bump"
|
|
||||||
|
|
||||||
detection_happened = road_state != "normal"
|
|
||||||
|
|
||||||
if not (not last_detection_state and detection_happened):
|
|
||||||
road_state = "normal"
|
|
||||||
|
|
||||||
_last_detection_state[user_id] = detection_happened
|
|
||||||
|
|
||||||
return ProcessedAgentData(
|
return ProcessedAgentData(
|
||||||
road_state=road_state,
|
road_state="normal",
|
||||||
agent_data=agent_data
|
agent_data=agent_data
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -33,7 +33,7 @@ processed_agent_data = Table(
|
|||||||
)
|
)
|
||||||
|
|
||||||
# WebSocket subscriptions
|
# WebSocket subscriptions
|
||||||
subscriptions: Set[WebSocket] = set()
|
subscriptions: Dict[int, Set[WebSocket]] = {}
|
||||||
|
|
||||||
|
|
||||||
# FastAPI WebSocket endpoint
|
# FastAPI WebSocket endpoint
|
||||||
@@ -41,31 +41,34 @@ subscriptions: Set[WebSocket] = set()
|
|||||||
async def websocket_endpoint(websocket: WebSocket, user_id: int):
|
async def websocket_endpoint(websocket: WebSocket, user_id: int):
|
||||||
await websocket.accept()
|
await websocket.accept()
|
||||||
|
|
||||||
subscriptions.add(websocket)
|
if user_id not in subscriptions:
|
||||||
|
subscriptions[user_id] = set()
|
||||||
|
|
||||||
|
subscriptions[user_id].add(websocket)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# send already available data
|
# send already available data
|
||||||
r = processed_agent_data.select().order_by(processed_agent_data.c.timestamp)
|
r = processed_agent_data.select()
|
||||||
stored_data = SessionLocal().execute(r).fetchall()
|
stored_data = SessionLocal().execute(r).fetchall()
|
||||||
|
|
||||||
jsonable_data = [{c.name: getattr(i, c.name) for c in processed_agent_data.columns} for i in stored_data]
|
jsonable_data = [{c.name: getattr(i, c.name) for c in processed_agent_data.columns} for i in stored_data]
|
||||||
for i in jsonable_data:
|
for i in jsonable_data:
|
||||||
i['timestamp'] = i['timestamp'].strftime("%Y-%m-%dT%H:%M:%SZ")
|
i['timestamp'] = i['timestamp'].strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||||
|
|
||||||
for i in jsonable_data:
|
await websocket.send_json(json.dumps(jsonable_data))
|
||||||
await websocket.send_json(json.dumps([i]))
|
|
||||||
|
|
||||||
# receive forever
|
# receive forever
|
||||||
while True:
|
while True:
|
||||||
await websocket.receive_text()
|
await websocket.receive_text()
|
||||||
except WebSocketDisconnect:
|
except WebSocketDisconnect:
|
||||||
subscriptions.remove(websocket)
|
subscriptions[user_id].remove(websocket)
|
||||||
|
|
||||||
|
|
||||||
# Function to send data to subscribed users
|
# Function to send data to subscribed users
|
||||||
async def send_data_to_subscribers(data):
|
async def send_data_to_subscribers(user_id: int, data):
|
||||||
for websocket in subscriptions:
|
if user_id in subscriptions:
|
||||||
await websocket.send_json(json.dumps([data]))
|
for websocket in subscriptions[user_id]:
|
||||||
|
await websocket.send_json(json.dumps(data))
|
||||||
|
|
||||||
|
|
||||||
# FastAPI CRUDL endpoints
|
# FastAPI CRUDL endpoints
|
||||||
@@ -96,8 +99,8 @@ async def create_processed_agent_data(data: List[ProcessedAgentData], user_id: i
|
|||||||
created_records = [dict(row._mapping) for row in result.fetchall()]
|
created_records = [dict(row._mapping) for row in result.fetchall()]
|
||||||
session.commit()
|
session.commit()
|
||||||
|
|
||||||
for record in sorted(created_records, key = lambda x: x['timestamp']):
|
for record in created_records:
|
||||||
await send_data_to_subscribers(jsonable_encoder(record))
|
await send_data_to_subscribers(user_id, jsonable_encoder(record))
|
||||||
return created_records
|
return created_records
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
session.rollback()
|
session.rollback()
|
||||||
|
|||||||
@@ -1,5 +1,4 @@
|
|||||||
import sys
|
import sys
|
||||||
import os
|
|
||||||
|
|
||||||
print("Checking for dead containers...")
|
print("Checking for dead containers...")
|
||||||
|
|
||||||
@@ -15,9 +14,6 @@ for i in statuses:
|
|||||||
if not i[status_index:].startswith("Up "):
|
if not i[status_index:].startswith("Up "):
|
||||||
service_name = i[name_index:]
|
service_name = i[name_index:]
|
||||||
print(f"Crash detected in {service_name}")
|
print(f"Crash detected in {service_name}")
|
||||||
print(f"docker logs for the container:\n")
|
|
||||||
os.system(f"docker logs {i.split(' ')[0]}")
|
|
||||||
print()
|
|
||||||
exit_code = 1
|
exit_code = 1
|
||||||
|
|
||||||
sys.exit(exit_code)
|
sys.exit(exit_code)
|
||||||
|
|||||||
Reference in New Issue
Block a user