Compare commits
12 Commits
project/ko
...
project/sh
| Author | SHA1 | Date | |
|---|---|---|---|
| 121bd007b3 | |||
| db63eb6d79 | |||
| 77d6968297 | |||
| 0c2392dc0b | |||
|
|
65f767d38e | ||
| 0695e3d092 | |||
|
|
d6e094e6c0 | ||
|
|
2167eb2960 | ||
|
|
38374a6723 | ||
| c08612f71a | |||
| bde51ca5e1 | |||
| a204bb1676 |
@@ -72,8 +72,8 @@ class Datasource:
|
||||
)
|
||||
new_points = [
|
||||
(
|
||||
processed_agent_data.latitude,
|
||||
processed_agent_data.longitude,
|
||||
processed_agent_data.latitude,
|
||||
processed_agent_data.road_state,
|
||||
processed_agent_data.user_id
|
||||
)
|
||||
|
||||
@@ -2,6 +2,6 @@ from dataclasses import dataclass
|
||||
|
||||
@dataclass
|
||||
class Accelerometer:
|
||||
x: int
|
||||
y: int
|
||||
z: int
|
||||
x: float
|
||||
y: float
|
||||
z: float
|
||||
|
||||
@@ -32,7 +32,7 @@ class MapViewApp(App):
|
||||
Встановлює необхідні маркери, викликає функцію для оновлення мапи
|
||||
"""
|
||||
self.update()
|
||||
Clock.schedule_interval(self.update, 5)
|
||||
Clock.schedule_interval(self.update, 0.1)
|
||||
|
||||
def update(self, *args):
|
||||
"""
|
||||
@@ -87,6 +87,7 @@ class MapViewApp(App):
|
||||
self.car_markers[user_id].lat = lat
|
||||
self.car_markers[user_id].lon = lon
|
||||
|
||||
if user_id == 1:
|
||||
self.mapview.center_on(lat, lon)
|
||||
|
||||
def set_pothole_marker(self, point):
|
||||
|
||||
@@ -3,6 +3,6 @@ from dataclasses import dataclass
|
||||
|
||||
@dataclass
|
||||
class Accelerometer:
|
||||
x: int
|
||||
y: int
|
||||
z: int
|
||||
x: float
|
||||
y: float
|
||||
z: float
|
||||
|
||||
@@ -15,6 +15,7 @@ class FileDatasource:
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
acc_divisor: float,
|
||||
accelerometer_filename: str,
|
||||
gps_filename: str,
|
||||
park_filename: str,
|
||||
@@ -34,6 +35,8 @@ class FileDatasource:
|
||||
|
||||
self._started = False
|
||||
|
||||
self.acc_divisor = acc_divisor
|
||||
|
||||
def startReading(self, *args, **kwargs):
|
||||
"""Must be called before read()"""
|
||||
if self._started:
|
||||
@@ -160,15 +163,14 @@ class FileDatasource:
|
||||
|
||||
return row
|
||||
|
||||
@staticmethod
|
||||
def _parse_acc(row: List[str]) -> Accelerometer:
|
||||
def _parse_acc(self, row: List[str]) -> Accelerometer:
|
||||
if len(row) < 3:
|
||||
raise ValueError(f"Accelerometer row must have 3 values (x,y,z). Got: {row}")
|
||||
|
||||
try:
|
||||
x = int(row[0])
|
||||
y = int(row[1])
|
||||
z = int(row[2])
|
||||
x = int(row[0]) / self.acc_divisor
|
||||
y = int(row[1]) / self.acc_divisor
|
||||
z = int(row[2]) / self.acc_divisor
|
||||
except ValueError as e:
|
||||
raise ValueError(f"Invalid accelerometer values (expected integers): {row}") from e
|
||||
|
||||
|
||||
@@ -16,6 +16,8 @@ def connect_mqtt(broker, port):
|
||||
print("Failed to connect {broker}:{port}, return code %d\n", rc)
|
||||
exit(rc) # Stop execution
|
||||
|
||||
logging.info(f"Acting as USER_ID = {config.USER_ID}")
|
||||
|
||||
client = mqtt_client.Client()
|
||||
client.on_connect = on_connect
|
||||
client.connect(broker, port)
|
||||
@@ -29,17 +31,18 @@ def publish(client, topic, datasource):
|
||||
data = datasource.read()
|
||||
msg = AggregatedDataSchema().dumps(data)
|
||||
result = client.publish(topic, msg)
|
||||
logging.info(f"Published to {topic}: {msg[:50]}...")
|
||||
logging.debug(f"Published to {topic}: {msg[:50]}...")
|
||||
status = result[0]
|
||||
if status != 0:
|
||||
print(f"Failed to send message to topic {topic}")
|
||||
logging.error(f"Failed to send message to topic {topic}")
|
||||
|
||||
|
||||
def run():
|
||||
logging.basicConfig(level = logging.INFO)
|
||||
# Prepare mqtt client
|
||||
client = connect_mqtt(config.MQTT_BROKER_HOST, config.MQTT_BROKER_PORT)
|
||||
# Prepare datasource
|
||||
datasource = FileDatasource("data/accelerometer.csv", "data/gps.csv", "data/parking.csv")
|
||||
datasource = FileDatasource(16384.0, "data/accelerometer.csv", "data/gps.csv", "data/parking.csv")
|
||||
# Infinity publish data
|
||||
publish(client, config.MQTT_TOPIC, datasource)
|
||||
|
||||
|
||||
@@ -2,6 +2,6 @@ from marshmallow import Schema, fields
|
||||
|
||||
|
||||
class AccelerometerSchema(Schema):
|
||||
x = fields.Int()
|
||||
y = fields.Int()
|
||||
z = fields.Int()
|
||||
x = fields.Float()
|
||||
y = fields.Float()
|
||||
z = fields.Float()
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
from app.entities.agent_data import AgentData
|
||||
from app.entities.processed_agent_data import ProcessedAgentData
|
||||
|
||||
_last_detection_state = {}
|
||||
|
||||
def process_agent_data(
|
||||
agent_data: AgentData,
|
||||
@@ -12,8 +13,24 @@ def process_agent_data(
|
||||
Returns:
|
||||
processed_data_batch (ProcessedAgentData): Processed data containing the classified state of the road surface and agent data.
|
||||
"""
|
||||
# Implement it
|
||||
user_id = agent_data.user_id
|
||||
road_state = "normal"
|
||||
|
||||
last_detection_state = _last_detection_state.get(user_id, False)
|
||||
|
||||
if (agent_data.accelerometer.z < 0.6):
|
||||
road_state = "pothole"
|
||||
elif (agent_data.accelerometer.z > 1.2):
|
||||
road_state = "bump"
|
||||
|
||||
detection_happened = road_state != "normal"
|
||||
|
||||
if not (not last_detection_state and detection_happened):
|
||||
road_state = "normal"
|
||||
|
||||
_last_detection_state[user_id] = detection_happened
|
||||
|
||||
return ProcessedAgentData(
|
||||
road_state="normal",
|
||||
road_state=road_state,
|
||||
agent_data=agent_data
|
||||
)
|
||||
|
||||
@@ -33,7 +33,7 @@ processed_agent_data = Table(
|
||||
)
|
||||
|
||||
# WebSocket subscriptions
|
||||
subscriptions: Dict[int, Set[WebSocket]] = {}
|
||||
subscriptions: Set[WebSocket] = set()
|
||||
|
||||
|
||||
# FastAPI WebSocket endpoint
|
||||
@@ -41,10 +41,7 @@ subscriptions: Dict[int, Set[WebSocket]] = {}
|
||||
async def websocket_endpoint(websocket: WebSocket, user_id: int):
|
||||
await websocket.accept()
|
||||
|
||||
if user_id not in subscriptions:
|
||||
subscriptions[user_id] = set()
|
||||
|
||||
subscriptions[user_id].add(websocket)
|
||||
subscriptions.add(websocket)
|
||||
|
||||
try:
|
||||
# send already available data
|
||||
@@ -55,20 +52,20 @@ async def websocket_endpoint(websocket: WebSocket, user_id: int):
|
||||
for i in jsonable_data:
|
||||
i['timestamp'] = i['timestamp'].strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||
|
||||
await websocket.send_json(json.dumps(jsonable_data))
|
||||
for i in jsonable_data:
|
||||
await websocket.send_json(json.dumps([i]))
|
||||
|
||||
# receive forever
|
||||
while True:
|
||||
await websocket.receive_text()
|
||||
except WebSocketDisconnect:
|
||||
subscriptions[user_id].remove(websocket)
|
||||
subscriptions.remove(websocket)
|
||||
|
||||
|
||||
# Function to send data to subscribed users
|
||||
async def send_data_to_subscribers(user_id: int, data):
|
||||
if user_id in subscriptions:
|
||||
for websocket in subscriptions[user_id]:
|
||||
await websocket.send_json(json.dumps(data))
|
||||
async def send_data_to_subscribers(data):
|
||||
for websocket in subscriptions:
|
||||
await websocket.send_json(json.dumps([data]))
|
||||
|
||||
|
||||
# FastAPI CRUDL endpoints
|
||||
@@ -100,7 +97,7 @@ async def create_processed_agent_data(data: List[ProcessedAgentData], user_id: i
|
||||
session.commit()
|
||||
|
||||
for record in created_records:
|
||||
await send_data_to_subscribers(user_id, jsonable_encoder(record))
|
||||
await send_data_to_subscribers(jsonable_encoder(record))
|
||||
return created_records
|
||||
except Exception as err:
|
||||
session.rollback()
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import sys
|
||||
import os
|
||||
|
||||
print("Checking for dead containers...")
|
||||
|
||||
@@ -14,6 +15,9 @@ for i in statuses:
|
||||
if not i[status_index:].startswith("Up "):
|
||||
service_name = i[name_index:]
|
||||
print(f"Crash detected in {service_name}")
|
||||
print(f"docker logs for the container:\n")
|
||||
os.system(f"docker logs {i.split(' ')[0]}")
|
||||
print()
|
||||
exit_code = 1
|
||||
|
||||
sys.exit(exit_code)
|
||||
|
||||
Reference in New Issue
Block a user