Compare commits

..

1 Commits

Author SHA1 Message Date
e45faeb281 updated compose file 2026-02-22 11:22:19 +01:00
4 changed files with 28 additions and 145 deletions

2
.gitignore vendored
View File

@@ -1,2 +0,0 @@
agent/docker/mosquitto/data/
agent/docker/mosquitto/log/

View File

@@ -1,4 +1,3 @@
version: "3.9"
name: "road_vision"
services:
mqtt:

View File

@@ -1,9 +1,5 @@
import csv
import time
from csv import reader
from datetime import datetime
from pathlib import Path
from typing import Optional, List
from domain.accelerometer import Accelerometer
from domain.gps import Gps
from domain.aggregated_data import AggregatedData
@@ -11,141 +7,24 @@ import config
class FileDatasource:
def __init__(self, accelerometer_filename: str, gps_filename: str) -> None:
self.accelerometer_filename = accelerometer_filename
self.gps_filename = gps_filename
self._acc_f = None
self._gps_f = None
self._acc_reader: Optional[csv.reader] = None
self._gps_reader: Optional[csv.reader] = None
self._started = False
def startReading(self, *args, **kwargs):
"""Must be called before read()"""
if self._started:
return
if not Path(self.accelerometer_filename).exists():
raise FileNotFoundError(f"Accelerometer file not found: {self.accelerometer_filename}")
if not Path(self.gps_filename).exists():
raise FileNotFoundError(f"GPS file not found: {self.gps_filename}")
self._open_files()
self._started = True
def stopReading(self, *args, **kwargs):
"""Must be called when finishing reading"""
self._close_files()
self._started = False
def __init__(
self,
accelerometer_filename: str,
gps_filename: str,
) -> None:
pass
def read(self) -> AggregatedData:
"""Return one combined sample (acc + gps)."""
if not self._started:
raise RuntimeError("Datasource is not started. Call startReading() before read().")
acc_row = self._get_next_row(self._acc_reader, source="acc")
gps_row = self._get_next_row(self._gps_reader, source="gps")
acc = self._parse_acc(acc_row)
gps = self._parse_gps(gps_row)
# IMPORTANT: timing belongs to datasource (not MQTT / main.py)
if config.DELAY and config.DELAY > 0:
time.sleep(float(config.DELAY))
"""Метод повертає дані отримані з датчиків"""
return AggregatedData(
accelerometer=acc,
gps=gps,
timestamp=datetime.utcnow(),
user_id=config.USER_ID,
Accelerometer(1, 2, 3),
Gps(4, 5),
datetime.now(),
config.USER_ID,
)
# ---------------- internal ----------------
def startReading(self, *args, **kwargs):
"""Метод повинен викликатись перед початком читання даних"""
def _open_files(self) -> None:
self._close_files()
self._acc_f = open(self.accelerometer_filename, "r", newline="", encoding="utf-8")
self._gps_f = open(self.gps_filename, "r", newline="", encoding="utf-8")
self._acc_reader = csv.reader(self._acc_f, skipinitialspace=True)
self._gps_reader = csv.reader(self._gps_f, skipinitialspace=True)
# File pointer is already at 0 right after open(), so no need to rewind here.
# Skip header row once.
next(self._acc_reader, None)
next(self._gps_reader, None)
def _close_files(self) -> None:
for f in (self._acc_f, self._gps_f):
try:
if f is not None:
f.close()
except Exception:
pass
self._acc_f = None
self._gps_f = None
self._acc_reader = None
self._gps_reader = None
def _rewind_acc(self) -> None:
if self._acc_f is None:
raise RuntimeError("Accelerometer file is not open.")
self._acc_f.seek(0)
self._acc_reader = csv.reader(self._acc_f, skipinitialspace=True)
next(self._acc_reader, None) # skip header row
def _rewind_gps(self) -> None:
if self._gps_f is None:
raise RuntimeError("GPS file is not open.")
self._gps_f.seek(0)
self._gps_reader = csv.reader(self._gps_f, skipinitialspace=True)
next(self._gps_reader, None) # skip header row
def _get_next_row(self, reader, source: str) -> List[str]:
"""Get the next valid row from the reader."""
if reader is None:
raise RuntimeError("Reader is not initialized.")
while True:
row = next(reader, None)
if row is None:
# EOF -> rewind & continue
if source == "acc":
self._rewind_acc()
reader = self._acc_reader
else:
self._rewind_gps()
reader = self._gps_reader
continue
if not row or not any(cell.strip() for cell in row):
continue
return row
@staticmethod
def _parse_acc(row: List[str]) -> Accelerometer:
if len(row) < 3:
raise ValueError(f"Accelerometer row must have 3 values (x,y,z). Got: {row}")
try:
x = int(row[0])
y = int(row[1])
z = int(row[2])
except ValueError as e:
raise ValueError(f"Invalid accelerometer values (expected integers): {row}") from e
return Accelerometer(x=x, y=y, z=z)
@staticmethod
def _parse_gps(row: List[str]) -> Gps:
if len(row) < 2:
raise ValueError(f"GPS row must have 2 values (longitude,latitude). Got: {row}")
lon = float(row[0])
lat = float(row[1])
return Gps(longitude=lon, latitude=lat)
def stopReading(self, *args, **kwargs):
"""Метод повинен викликатись для закінчення читання даних"""

View File

@@ -1,4 +1,6 @@
from paho.mqtt import client as mqtt_client
import json
import time
from schema.aggregated_data_schema import AggregatedDataSchema
from file_datasource import FileDatasource
import config
@@ -22,14 +24,19 @@ def connect_mqtt(broker, port):
return client
def publish(client, topic, datasource):
def publish(client, topic, datasource, delay):
datasource.startReading()
while True:
time.sleep(delay)
data = datasource.read()
msg = AggregatedDataSchema().dumps(data)
result = client.publish(topic, msg)
# result: [0, 1]
status = result[0]
if status != 0:
if status == 0:
pass
# print(f"Send `{msg}` to topic `{topic}`")
else:
print(f"Failed to send message to topic {topic}")
@@ -37,9 +44,9 @@ def run():
# Prepare mqtt client
client = connect_mqtt(config.MQTT_BROKER_HOST, config.MQTT_BROKER_PORT)
# Prepare datasource
datasource = FileDatasource("data/accelerometer.csv", "data/gps.csv")
datasource = FileDatasource("data/data.csv", "data/gps_data.csv")
# Infinity publish data
publish(client, config.MQTT_TOPIC, datasource)
publish(client, config.MQTT_TOPIC, datasource, config.DELAY)
if __name__ == "__main__":