Compare commits
12 Commits
1df1c6bd83
...
project/sh
| Author | SHA1 | Date | |
|---|---|---|---|
| 121bd007b3 | |||
| db63eb6d79 | |||
| 77d6968297 | |||
| 0c2392dc0b | |||
|
|
65f767d38e | ||
| 0695e3d092 | |||
|
|
d6e094e6c0 | ||
|
|
2167eb2960 | ||
|
|
38374a6723 | ||
| c08612f71a | |||
| bde51ca5e1 | |||
| a204bb1676 |
@@ -72,8 +72,8 @@ class Datasource:
|
|||||||
)
|
)
|
||||||
new_points = [
|
new_points = [
|
||||||
(
|
(
|
||||||
processed_agent_data.latitude,
|
|
||||||
processed_agent_data.longitude,
|
processed_agent_data.longitude,
|
||||||
|
processed_agent_data.latitude,
|
||||||
processed_agent_data.road_state,
|
processed_agent_data.road_state,
|
||||||
processed_agent_data.user_id
|
processed_agent_data.user_id
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -2,6 +2,6 @@ from dataclasses import dataclass
|
|||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class Accelerometer:
|
class Accelerometer:
|
||||||
x: int
|
x: float
|
||||||
y: int
|
y: float
|
||||||
z: int
|
z: float
|
||||||
|
|||||||
@@ -32,7 +32,7 @@ class MapViewApp(App):
|
|||||||
Встановлює необхідні маркери, викликає функцію для оновлення мапи
|
Встановлює необхідні маркери, викликає функцію для оновлення мапи
|
||||||
"""
|
"""
|
||||||
self.update()
|
self.update()
|
||||||
Clock.schedule_interval(self.update, 5)
|
Clock.schedule_interval(self.update, 0.1)
|
||||||
|
|
||||||
def update(self, *args):
|
def update(self, *args):
|
||||||
"""
|
"""
|
||||||
@@ -87,7 +87,8 @@ class MapViewApp(App):
|
|||||||
self.car_markers[user_id].lat = lat
|
self.car_markers[user_id].lat = lat
|
||||||
self.car_markers[user_id].lon = lon
|
self.car_markers[user_id].lon = lon
|
||||||
|
|
||||||
self.mapview.center_on(lat, lon)
|
if user_id == 1:
|
||||||
|
self.mapview.center_on(lat, lon)
|
||||||
|
|
||||||
def set_pothole_marker(self, point):
|
def set_pothole_marker(self, point):
|
||||||
if isinstance(point, dict):
|
if isinstance(point, dict):
|
||||||
|
|||||||
@@ -3,6 +3,6 @@ from dataclasses import dataclass
|
|||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class Accelerometer:
|
class Accelerometer:
|
||||||
x: int
|
x: float
|
||||||
y: int
|
y: float
|
||||||
z: int
|
z: float
|
||||||
|
|||||||
@@ -15,6 +15,7 @@ class FileDatasource:
|
|||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
|
acc_divisor: float,
|
||||||
accelerometer_filename: str,
|
accelerometer_filename: str,
|
||||||
gps_filename: str,
|
gps_filename: str,
|
||||||
park_filename: str,
|
park_filename: str,
|
||||||
@@ -34,6 +35,8 @@ class FileDatasource:
|
|||||||
|
|
||||||
self._started = False
|
self._started = False
|
||||||
|
|
||||||
|
self.acc_divisor = acc_divisor
|
||||||
|
|
||||||
def startReading(self, *args, **kwargs):
|
def startReading(self, *args, **kwargs):
|
||||||
"""Must be called before read()"""
|
"""Must be called before read()"""
|
||||||
if self._started:
|
if self._started:
|
||||||
@@ -160,15 +163,14 @@ class FileDatasource:
|
|||||||
|
|
||||||
return row
|
return row
|
||||||
|
|
||||||
@staticmethod
|
def _parse_acc(self, row: List[str]) -> Accelerometer:
|
||||||
def _parse_acc(row: List[str]) -> Accelerometer:
|
|
||||||
if len(row) < 3:
|
if len(row) < 3:
|
||||||
raise ValueError(f"Accelerometer row must have 3 values (x,y,z). Got: {row}")
|
raise ValueError(f"Accelerometer row must have 3 values (x,y,z). Got: {row}")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
x = int(row[0])
|
x = int(row[0]) / self.acc_divisor
|
||||||
y = int(row[1])
|
y = int(row[1]) / self.acc_divisor
|
||||||
z = int(row[2])
|
z = int(row[2]) / self.acc_divisor
|
||||||
except ValueError as e:
|
except ValueError as e:
|
||||||
raise ValueError(f"Invalid accelerometer values (expected integers): {row}") from e
|
raise ValueError(f"Invalid accelerometer values (expected integers): {row}") from e
|
||||||
|
|
||||||
|
|||||||
@@ -16,6 +16,8 @@ def connect_mqtt(broker, port):
|
|||||||
print("Failed to connect {broker}:{port}, return code %d\n", rc)
|
print("Failed to connect {broker}:{port}, return code %d\n", rc)
|
||||||
exit(rc) # Stop execution
|
exit(rc) # Stop execution
|
||||||
|
|
||||||
|
logging.info(f"Acting as USER_ID = {config.USER_ID}")
|
||||||
|
|
||||||
client = mqtt_client.Client()
|
client = mqtt_client.Client()
|
||||||
client.on_connect = on_connect
|
client.on_connect = on_connect
|
||||||
client.connect(broker, port)
|
client.connect(broker, port)
|
||||||
@@ -29,17 +31,18 @@ def publish(client, topic, datasource):
|
|||||||
data = datasource.read()
|
data = datasource.read()
|
||||||
msg = AggregatedDataSchema().dumps(data)
|
msg = AggregatedDataSchema().dumps(data)
|
||||||
result = client.publish(topic, msg)
|
result = client.publish(topic, msg)
|
||||||
logging.info(f"Published to {topic}: {msg[:50]}...")
|
logging.debug(f"Published to {topic}: {msg[:50]}...")
|
||||||
status = result[0]
|
status = result[0]
|
||||||
if status != 0:
|
if status != 0:
|
||||||
print(f"Failed to send message to topic {topic}")
|
logging.error(f"Failed to send message to topic {topic}")
|
||||||
|
|
||||||
|
|
||||||
def run():
|
def run():
|
||||||
|
logging.basicConfig(level = logging.INFO)
|
||||||
# Prepare mqtt client
|
# Prepare mqtt client
|
||||||
client = connect_mqtt(config.MQTT_BROKER_HOST, config.MQTT_BROKER_PORT)
|
client = connect_mqtt(config.MQTT_BROKER_HOST, config.MQTT_BROKER_PORT)
|
||||||
# Prepare datasource
|
# Prepare datasource
|
||||||
datasource = FileDatasource("data/accelerometer.csv", "data/gps.csv", "data/parking.csv")
|
datasource = FileDatasource(16384.0, "data/accelerometer.csv", "data/gps.csv", "data/parking.csv")
|
||||||
# Infinity publish data
|
# Infinity publish data
|
||||||
publish(client, config.MQTT_TOPIC, datasource)
|
publish(client, config.MQTT_TOPIC, datasource)
|
||||||
|
|
||||||
|
|||||||
@@ -2,6 +2,6 @@ from marshmallow import Schema, fields
|
|||||||
|
|
||||||
|
|
||||||
class AccelerometerSchema(Schema):
|
class AccelerometerSchema(Schema):
|
||||||
x = fields.Int()
|
x = fields.Float()
|
||||||
y = fields.Int()
|
y = fields.Float()
|
||||||
z = fields.Int()
|
z = fields.Float()
|
||||||
|
|||||||
@@ -1,19 +0,0 @@
|
|||||||
@startuml
|
|
||||||
actor Worker as worker
|
|
||||||
participant Agent as agent
|
|
||||||
participant "Edge" as edge
|
|
||||||
participant "Hub" as hub
|
|
||||||
participant "Store" as store
|
|
||||||
participant "MapView" as mapview
|
|
||||||
actor Spectator as spectator
|
|
||||||
|
|
||||||
worker -> agent : Start agent
|
|
||||||
agent -> edge : Send sensor data
|
|
||||||
edge -> edge : Classify road state
|
|
||||||
edge -> hub : Send processed data
|
|
||||||
hub -> hub : Save processed data
|
|
||||||
hub -> store : Send processed data batch
|
|
||||||
store -> store : Save processed data batch
|
|
||||||
store -> mapview : Send new rows
|
|
||||||
mapview -> spectator : Update paths and markers
|
|
||||||
@enduml
|
|
||||||
@@ -1,15 +0,0 @@
|
|||||||
@startuml
|
|
||||||
|Користувач|
|
|
||||||
start
|
|
||||||
:Натискає на іконку ями;
|
|
||||||
|Система|
|
|
||||||
:Запитує підтвердження видалення ями;
|
|
||||||
|Користувач|
|
|
||||||
:Підтверджує видалення ями;
|
|
||||||
|Система|
|
|
||||||
:Модифікує запис про яму;
|
|
||||||
note right #ff9999: Можлива виключна ситуація 1
|
|
||||||
:Видаляє яму;
|
|
||||||
|Користувач|
|
|
||||||
stop
|
|
||||||
@enduml
|
|
||||||
@@ -1,23 +0,0 @@
|
|||||||
@startuml
|
|
||||||
rectangle IoT-Systems {
|
|
||||||
usecase "Collect telemetry (accelerometer + GPS)" as uc1
|
|
||||||
usecase "Send telemetry" as uc2
|
|
||||||
usecase "Process telemetry" as uc3
|
|
||||||
usecase "Determine road condition (pothole / bump /\nnormal)" as uc4
|
|
||||||
usecase "View road defect marks" as uc5
|
|
||||||
usecase "View route on map" as uc6
|
|
||||||
}
|
|
||||||
|
|
||||||
rectangle "The user is the card operator" as uc10
|
|
||||||
rectangle "Sensor Agent\n(Device/STM32/Emulator)" as uc11
|
|
||||||
|
|
||||||
uc11 - uc1
|
|
||||||
uc11 - uc2
|
|
||||||
|
|
||||||
uc10 - uc5
|
|
||||||
uc10 - uc6
|
|
||||||
|
|
||||||
uc2 -.|> uc3 : <<include>>
|
|
||||||
uc3 -.|> uc4 : <<include>>
|
|
||||||
|
|
||||||
@enduml
|
|
||||||
@@ -1,6 +1,7 @@
|
|||||||
from app.entities.agent_data import AgentData
|
from app.entities.agent_data import AgentData
|
||||||
from app.entities.processed_agent_data import ProcessedAgentData
|
from app.entities.processed_agent_data import ProcessedAgentData
|
||||||
|
|
||||||
|
_last_detection_state = {}
|
||||||
|
|
||||||
def process_agent_data(
|
def process_agent_data(
|
||||||
agent_data: AgentData,
|
agent_data: AgentData,
|
||||||
@@ -12,8 +13,24 @@ def process_agent_data(
|
|||||||
Returns:
|
Returns:
|
||||||
processed_data_batch (ProcessedAgentData): Processed data containing the classified state of the road surface and agent data.
|
processed_data_batch (ProcessedAgentData): Processed data containing the classified state of the road surface and agent data.
|
||||||
"""
|
"""
|
||||||
# Implement it
|
user_id = agent_data.user_id
|
||||||
|
road_state = "normal"
|
||||||
|
|
||||||
|
last_detection_state = _last_detection_state.get(user_id, False)
|
||||||
|
|
||||||
|
if (agent_data.accelerometer.z < 0.6):
|
||||||
|
road_state = "pothole"
|
||||||
|
elif (agent_data.accelerometer.z > 1.2):
|
||||||
|
road_state = "bump"
|
||||||
|
|
||||||
|
detection_happened = road_state != "normal"
|
||||||
|
|
||||||
|
if not (not last_detection_state and detection_happened):
|
||||||
|
road_state = "normal"
|
||||||
|
|
||||||
|
_last_detection_state[user_id] = detection_happened
|
||||||
|
|
||||||
return ProcessedAgentData(
|
return ProcessedAgentData(
|
||||||
road_state="normal",
|
road_state=road_state,
|
||||||
agent_data=agent_data
|
agent_data=agent_data
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -33,7 +33,7 @@ processed_agent_data = Table(
|
|||||||
)
|
)
|
||||||
|
|
||||||
# WebSocket subscriptions
|
# WebSocket subscriptions
|
||||||
subscriptions: Dict[int, Set[WebSocket]] = {}
|
subscriptions: Set[WebSocket] = set()
|
||||||
|
|
||||||
|
|
||||||
# FastAPI WebSocket endpoint
|
# FastAPI WebSocket endpoint
|
||||||
@@ -41,10 +41,7 @@ subscriptions: Dict[int, Set[WebSocket]] = {}
|
|||||||
async def websocket_endpoint(websocket: WebSocket, user_id: int):
|
async def websocket_endpoint(websocket: WebSocket, user_id: int):
|
||||||
await websocket.accept()
|
await websocket.accept()
|
||||||
|
|
||||||
if user_id not in subscriptions:
|
subscriptions.add(websocket)
|
||||||
subscriptions[user_id] = set()
|
|
||||||
|
|
||||||
subscriptions[user_id].add(websocket)
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# send already available data
|
# send already available data
|
||||||
@@ -55,20 +52,20 @@ async def websocket_endpoint(websocket: WebSocket, user_id: int):
|
|||||||
for i in jsonable_data:
|
for i in jsonable_data:
|
||||||
i['timestamp'] = i['timestamp'].strftime("%Y-%m-%dT%H:%M:%SZ")
|
i['timestamp'] = i['timestamp'].strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||||
|
|
||||||
await websocket.send_json(json.dumps(jsonable_data))
|
for i in jsonable_data:
|
||||||
|
await websocket.send_json(json.dumps([i]))
|
||||||
|
|
||||||
# receive forever
|
# receive forever
|
||||||
while True:
|
while True:
|
||||||
await websocket.receive_text()
|
await websocket.receive_text()
|
||||||
except WebSocketDisconnect:
|
except WebSocketDisconnect:
|
||||||
subscriptions[user_id].remove(websocket)
|
subscriptions.remove(websocket)
|
||||||
|
|
||||||
|
|
||||||
# Function to send data to subscribed users
|
# Function to send data to subscribed users
|
||||||
async def send_data_to_subscribers(user_id: int, data):
|
async def send_data_to_subscribers(data):
|
||||||
if user_id in subscriptions:
|
for websocket in subscriptions:
|
||||||
for websocket in subscriptions[user_id]:
|
await websocket.send_json(json.dumps([data]))
|
||||||
await websocket.send_json(json.dumps(data))
|
|
||||||
|
|
||||||
|
|
||||||
# FastAPI CRUDL endpoints
|
# FastAPI CRUDL endpoints
|
||||||
@@ -100,7 +97,7 @@ async def create_processed_agent_data(data: List[ProcessedAgentData], user_id: i
|
|||||||
session.commit()
|
session.commit()
|
||||||
|
|
||||||
for record in created_records:
|
for record in created_records:
|
||||||
await send_data_to_subscribers(user_id, jsonable_encoder(record))
|
await send_data_to_subscribers(jsonable_encoder(record))
|
||||||
return created_records
|
return created_records
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
session.rollback()
|
session.rollback()
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
import sys
|
import sys
|
||||||
|
import os
|
||||||
|
|
||||||
print("Checking for dead containers...")
|
print("Checking for dead containers...")
|
||||||
|
|
||||||
@@ -14,6 +15,9 @@ for i in statuses:
|
|||||||
if not i[status_index:].startswith("Up "):
|
if not i[status_index:].startswith("Up "):
|
||||||
service_name = i[name_index:]
|
service_name = i[name_index:]
|
||||||
print(f"Crash detected in {service_name}")
|
print(f"Crash detected in {service_name}")
|
||||||
|
print(f"docker logs for the container:\n")
|
||||||
|
os.system(f"docker logs {i.split(' ')[0]}")
|
||||||
|
print()
|
||||||
exit_code = 1
|
exit_code = 1
|
||||||
|
|
||||||
sys.exit(exit_code)
|
sys.exit(exit_code)
|
||||||
|
|||||||
Reference in New Issue
Block a user