SCRUM-34: slobodeniuk parking signals

This commit is contained in:
Senya 2026-02-26 12:11:07 +02:00
parent 7ddfb68b02
commit f9ef916331
7 changed files with 74 additions and 7 deletions

4
.gitignore vendored
View File

@ -1,2 +1,4 @@
agent/docker/mosquitto/data/ agent/docker/mosquitto/data/
agent/docker/mosquitto/log/ agent/docker/mosquitto/log/
.idea/

0
agent/src/__init__.py Normal file
View File

View File

@ -1,13 +1,16 @@
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime from datetime import datetime
from domain.accelerometer import Accelerometer
from domain.gps import Gps from agent.src.domain.accelerometer import Accelerometer
from agent.src.domain.gps import Gps
from agent.src.domain.parking import Parking
@dataclass @dataclass
class AggregatedData: class AggregatedData:
accelerometer: Accelerometer accelerometer: Accelerometer
gps: Gps gps: Gps
parking: Parking
timestamp: datetime timestamp: datetime
user_id: int user_id: int

View File

@ -0,0 +1,9 @@
from dataclasses import dataclass
from agent.src.domain.gps import Gps
@dataclass
class Parking:
empty_count: int
gps: Gps

View File

@ -4,6 +4,7 @@ from datetime import datetime
from pathlib import Path from pathlib import Path
from typing import Optional, List from typing import Optional, List
from domain.parking import Parking
from domain.accelerometer import Accelerometer from domain.accelerometer import Accelerometer
from domain.gps import Gps from domain.gps import Gps
from domain.aggregated_data import AggregatedData from domain.aggregated_data import AggregatedData
@ -12,12 +13,22 @@ import config
class FileDatasource: class FileDatasource:
def __init__(self, accelerometer_filename: str, gps_filename: str) -> None: def __init__(
self,
accelerometer_filename: str,
gps_filename: str,
park_filename: str,
) -> None:
self.accelerometer_filename = accelerometer_filename self.accelerometer_filename = accelerometer_filename
self.park_filename = park_filename
self.gps_filename = gps_filename self.gps_filename = gps_filename
self._park_f = None
self._acc_f = None self._acc_f = None
self._gps_f = None self._gps_f = None
self._park_reader: Optional[csv.reader] = None
self._acc_reader: Optional[csv.reader] = None self._acc_reader: Optional[csv.reader] = None
self._gps_reader: Optional[csv.reader] = None self._gps_reader: Optional[csv.reader] = None
@ -30,6 +41,8 @@ class FileDatasource:
if not Path(self.accelerometer_filename).exists(): if not Path(self.accelerometer_filename).exists():
raise FileNotFoundError(f"Accelerometer file not found: {self.accelerometer_filename}") raise FileNotFoundError(f"Accelerometer file not found: {self.accelerometer_filename}")
if not Path(self.park_filename).exists():
raise FileNotFoundError(f"Accelerometer file not found: {self.park_filename}")
if not Path(self.gps_filename).exists(): if not Path(self.gps_filename).exists():
raise FileNotFoundError(f"GPS file not found: {self.gps_filename}") raise FileNotFoundError(f"GPS file not found: {self.gps_filename}")
@ -47,9 +60,11 @@ class FileDatasource:
raise RuntimeError("Datasource is not started. Call startReading() before read().") raise RuntimeError("Datasource is not started. Call startReading() before read().")
acc_row = self._get_next_row(self._acc_reader, source="acc") acc_row = self._get_next_row(self._acc_reader, source="acc")
park_row = self._get_next_row(self._park_reader, source="park")
gps_row = self._get_next_row(self._gps_reader, source="gps") gps_row = self._get_next_row(self._gps_reader, source="gps")
acc = self._parse_acc(acc_row) acc = self._parse_acc(acc_row)
park = self._parse_park(park_row)
gps = self._parse_gps(gps_row) gps = self._parse_gps(gps_row)
# IMPORTANT: timing belongs to datasource (not MQTT / main.py) # IMPORTANT: timing belongs to datasource (not MQTT / main.py)
@ -59,6 +74,7 @@ class FileDatasource:
return AggregatedData( return AggregatedData(
accelerometer=acc, accelerometer=acc,
gps=gps, gps=gps,
parking=park,
timestamp=datetime.utcnow(), timestamp=datetime.utcnow(),
user_id=config.USER_ID, user_id=config.USER_ID,
) )
@ -69,14 +85,17 @@ class FileDatasource:
self._close_files() self._close_files()
self._acc_f = open(self.accelerometer_filename, "r", newline="", encoding="utf-8") self._acc_f = open(self.accelerometer_filename, "r", newline="", encoding="utf-8")
self._park_f = open(self.park_filename, "r", newline="", encoding="utf-8")
self._gps_f = open(self.gps_filename, "r", newline="", encoding="utf-8") self._gps_f = open(self.gps_filename, "r", newline="", encoding="utf-8")
self._acc_reader = csv.reader(self._acc_f, skipinitialspace=True) self._acc_reader = csv.reader(self._acc_f, skipinitialspace=True)
self._park_reader = csv.reader(self._park_f, skipinitialspace=True)
self._gps_reader = csv.reader(self._gps_f, skipinitialspace=True) self._gps_reader = csv.reader(self._gps_f, skipinitialspace=True)
# File pointer is already at 0 right after open(), so no need to rewind here. # File pointer is already at 0 right after open(), so no need to rewind here.
# Skip header row once. # Skip header row once.
next(self._acc_reader, None) next(self._acc_reader, None)
next(self._park_reader, None)
next(self._gps_reader, None) next(self._gps_reader, None)
def _close_files(self) -> None: def _close_files(self) -> None:
@ -88,8 +107,10 @@ class FileDatasource:
pass pass
self._acc_f = None self._acc_f = None
self._park_f = None
self._gps_f = None self._gps_f = None
self._acc_reader = None self._acc_reader = None
self._park_reader = None
self._gps_reader = None self._gps_reader = None
def _rewind_acc(self) -> None: def _rewind_acc(self) -> None:
@ -106,6 +127,13 @@ class FileDatasource:
self._gps_reader = csv.reader(self._gps_f, skipinitialspace=True) self._gps_reader = csv.reader(self._gps_f, skipinitialspace=True)
next(self._gps_reader, None) # skip header row next(self._gps_reader, None) # skip header row
def _rewind_park(self) -> None:
if self._park_f is None:
raise RuntimeError("GPS file is not open.")
self._park_f.seek(0)
self._park_reader = csv.reader(self._park_f, skipinitialspace=True)
next(self._park_reader, None) # skip header row
def _get_next_row(self, reader, source: str) -> List[str]: def _get_next_row(self, reader, source: str) -> List[str]:
"""Get the next valid row from the reader.""" """Get the next valid row from the reader."""
if reader is None: if reader is None:
@ -118,6 +146,10 @@ class FileDatasource:
if source == "acc": if source == "acc":
self._rewind_acc() self._rewind_acc()
reader = self._acc_reader reader = self._acc_reader
elif source == 'park':
self._rewind_park()
reader = self._park_reader
else: else:
self._rewind_gps() self._rewind_gps()
reader = self._gps_reader reader = self._gps_reader
@ -148,4 +180,17 @@ class FileDatasource:
raise ValueError(f"GPS row must have 2 values (longitude,latitude). Got: {row}") raise ValueError(f"GPS row must have 2 values (longitude,latitude). Got: {row}")
lon = float(row[0]) lon = float(row[0])
lat = float(row[1]) lat = float(row[1])
return Gps(longitude=lon, latitude=lat) return Gps(longitude=lon, latitude=lat)
@staticmethod
def _parse_park(row: List[str]) -> Parking:
if len(row) < 2:
raise ValueError(f"GPS row must have 2 values (longitude,latitude). Got: {row}")
lon = float(row[0])
lat = float(row[1])
empty_count = int(row[2])
return Parking(
gps=Gps(longitude=lon, latitude=lat),
empty_count=empty_count,
)

View File

@ -37,10 +37,10 @@ def run():
# Prepare mqtt client # Prepare mqtt client
client = connect_mqtt(config.MQTT_BROKER_HOST, config.MQTT_BROKER_PORT) client = connect_mqtt(config.MQTT_BROKER_HOST, config.MQTT_BROKER_PORT)
# Prepare datasource # Prepare datasource
datasource = FileDatasource("data/accelerometer.csv", "data/gps.csv") datasource = FileDatasource("data/accelerometer.csv", "data/gps.csv", "data/parking.csv")
# Infinity publish data # Infinity publish data
publish(client, config.MQTT_TOPIC, datasource) publish(client, config.MQTT_TOPIC, datasource)
if __name__ == "__main__": if __name__ == "__main__":
run() run()

View File

@ -0,0 +1,8 @@
from marshmallow import Schema, fields
from agent.src.schema.gps_schema import GpsSchema
class ParkingSchema(Schema):
gps = fields.Nested(GpsSchema)
empty_count = fields.Int()