diff --git a/.gitignore b/.gitignore index 24a7bbe..3f20040 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,4 @@ agent/docker/mosquitto/data/ -agent/docker/mosquitto/log/ \ No newline at end of file +agent/docker/mosquitto/log/ + +.idea/ \ No newline at end of file diff --git a/agent/src/__init__.py b/agent/src/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/agent/src/data/parking.csv b/agent/src/data/parking.csv new file mode 100644 index 0000000..c006344 --- /dev/null +++ b/agent/src/data/parking.csv @@ -0,0 +1,22 @@ +longitude,latitude,empty_count +50.450386085935094,30.524547100067142,10 +50.450386085935094,30.524547100067142,11 +50.450386085935094,30.524547100067142,13 +50.450386085935094,30.524547100067142,15 +50.450386085935094,30.524547100067142,7 +50.450386085935094,30.524547100067142,9 +50.450386085935094,30.524547100067142,4 +50.450386085935094,30.524547100067142,0 +50.450386085935094,30.524547100067142,0 +50.450386085935094,30.524547100067142,3 +50.450386085935094,30.524547100067142,4 +50.450069433207545,30.52406822530458,16 +50.450069433207545,30.52406822530458,20 +50.450069433207545,30.52406822530458,25 +50.450069433207545,30.52406822530458,30 +50.450069433207545,30.52406822530458,29 +50.450069433207545,30.52406822530458,12 +50.450069433207545,30.52406822530458,10 +50.450069433207545,30.52406822530458,14 +50.450069433207545,30.52406822530458,3 +50.450069433207545,30.52406822530458,2 diff --git a/agent/src/domain/aggregated_data.py b/agent/src/domain/aggregated_data.py index 0387ee2..8d76335 100644 --- a/agent/src/domain/aggregated_data.py +++ b/agent/src/domain/aggregated_data.py @@ -1,13 +1,16 @@ from dataclasses import dataclass from datetime import datetime + from domain.accelerometer import Accelerometer from domain.gps import Gps +from domain.parking import Parking @dataclass class AggregatedData: accelerometer: Accelerometer gps: Gps + parking: Parking timestamp: datetime user_id: int diff --git a/agent/src/domain/parking.py b/agent/src/domain/parking.py new file mode 100644 index 0000000..9783148 --- /dev/null +++ b/agent/src/domain/parking.py @@ -0,0 +1,9 @@ +from dataclasses import dataclass + +from domain.gps import Gps + + +@dataclass +class Parking: + empty_count: int + gps: Gps diff --git a/agent/src/file_datasource.py b/agent/src/file_datasource.py index 363f1df..0425ca6 100644 --- a/agent/src/file_datasource.py +++ b/agent/src/file_datasource.py @@ -4,6 +4,7 @@ from datetime import datetime from pathlib import Path from typing import Optional, List +from domain.parking import Parking from domain.accelerometer import Accelerometer from domain.gps import Gps from domain.aggregated_data import AggregatedData @@ -12,12 +13,22 @@ import config class FileDatasource: - def __init__(self, accelerometer_filename: str, gps_filename: str) -> None: + def __init__( + self, + accelerometer_filename: str, + gps_filename: str, + park_filename: str, + ) -> None: + self.accelerometer_filename = accelerometer_filename + self.park_filename = park_filename self.gps_filename = gps_filename + self._park_f = None self._acc_f = None self._gps_f = None + + self._park_reader: Optional[csv.reader] = None self._acc_reader: Optional[csv.reader] = None self._gps_reader: Optional[csv.reader] = None @@ -30,6 +41,8 @@ class FileDatasource: if not Path(self.accelerometer_filename).exists(): raise FileNotFoundError(f"Accelerometer file not found: {self.accelerometer_filename}") + if not Path(self.park_filename).exists(): + raise FileNotFoundError(f"Accelerometer file not found: {self.park_filename}") if not Path(self.gps_filename).exists(): raise FileNotFoundError(f"GPS file not found: {self.gps_filename}") @@ -47,9 +60,11 @@ class FileDatasource: raise RuntimeError("Datasource is not started. Call startReading() before read().") acc_row = self._get_next_row(self._acc_reader, source="acc") + park_row = self._get_next_row(self._park_reader, source="park") gps_row = self._get_next_row(self._gps_reader, source="gps") acc = self._parse_acc(acc_row) + park = self._parse_park(park_row) gps = self._parse_gps(gps_row) # IMPORTANT: timing belongs to datasource (not MQTT / main.py) @@ -59,6 +74,7 @@ class FileDatasource: return AggregatedData( accelerometer=acc, gps=gps, + parking=park, timestamp=datetime.utcnow(), user_id=config.USER_ID, ) @@ -69,14 +85,17 @@ class FileDatasource: self._close_files() self._acc_f = open(self.accelerometer_filename, "r", newline="", encoding="utf-8") + self._park_f = open(self.park_filename, "r", newline="", encoding="utf-8") self._gps_f = open(self.gps_filename, "r", newline="", encoding="utf-8") self._acc_reader = csv.reader(self._acc_f, skipinitialspace=True) + self._park_reader = csv.reader(self._park_f, skipinitialspace=True) self._gps_reader = csv.reader(self._gps_f, skipinitialspace=True) # File pointer is already at 0 right after open(), so no need to rewind here. # Skip header row once. next(self._acc_reader, None) + next(self._park_reader, None) next(self._gps_reader, None) def _close_files(self) -> None: @@ -88,8 +107,10 @@ class FileDatasource: pass self._acc_f = None + self._park_f = None self._gps_f = None self._acc_reader = None + self._park_reader = None self._gps_reader = None def _rewind_acc(self) -> None: @@ -106,6 +127,13 @@ class FileDatasource: self._gps_reader = csv.reader(self._gps_f, skipinitialspace=True) next(self._gps_reader, None) # skip header row + def _rewind_park(self) -> None: + if self._park_f is None: + raise RuntimeError("GPS file is not open.") + self._park_f.seek(0) + self._park_reader = csv.reader(self._park_f, skipinitialspace=True) + next(self._park_reader, None) # skip header row + def _get_next_row(self, reader, source: str) -> List[str]: """Get the next valid row from the reader.""" if reader is None: @@ -118,6 +146,10 @@ class FileDatasource: if source == "acc": self._rewind_acc() reader = self._acc_reader + + elif source == 'park': + self._rewind_park() + reader = self._park_reader else: self._rewind_gps() reader = self._gps_reader @@ -148,4 +180,17 @@ class FileDatasource: raise ValueError(f"GPS row must have 2 values (longitude,latitude). Got: {row}") lon = float(row[0]) lat = float(row[1]) - return Gps(longitude=lon, latitude=lat) \ No newline at end of file + return Gps(longitude=lon, latitude=lat) + + @staticmethod + def _parse_park(row: List[str]) -> Parking: + if len(row) < 2: + raise ValueError(f"GPS row must have 2 values (longitude,latitude). Got: {row}") + lon = float(row[0]) + lat = float(row[1]) + empty_count = int(row[2]) + + return Parking( + gps=Gps(longitude=lon, latitude=lat), + empty_count=empty_count + ) diff --git a/agent/src/main.py b/agent/src/main.py index 5513bf4..ce17d6b 100644 --- a/agent/src/main.py +++ b/agent/src/main.py @@ -37,10 +37,10 @@ def run(): # Prepare mqtt client client = connect_mqtt(config.MQTT_BROKER_HOST, config.MQTT_BROKER_PORT) # Prepare datasource - datasource = FileDatasource("data/accelerometer.csv", "data/gps.csv") + datasource = FileDatasource("data/accelerometer.csv", "data/gps.csv", "data/parking.csv") # Infinity publish data publish(client, config.MQTT_TOPIC, datasource) if __name__ == "__main__": - run() \ No newline at end of file + run() diff --git a/agent/src/schema/aggregated_data_schema.py b/agent/src/schema/aggregated_data_schema.py index e6fa482..74fe497 100644 --- a/agent/src/schema/aggregated_data_schema.py +++ b/agent/src/schema/aggregated_data_schema.py @@ -1,10 +1,12 @@ from marshmallow import Schema, fields from schema.accelerometer_schema import AccelerometerSchema from schema.gps_schema import GpsSchema +from schema.parking_schema import ParkingSchema class AggregatedDataSchema(Schema): accelerometer = fields.Nested(AccelerometerSchema) gps = fields.Nested(GpsSchema) + parking = fields.Nested(ParkingSchema) timestamp = fields.DateTime("iso") user_id = fields.Int() diff --git a/agent/src/schema/parking_schema.py b/agent/src/schema/parking_schema.py new file mode 100644 index 0000000..eabf208 --- /dev/null +++ b/agent/src/schema/parking_schema.py @@ -0,0 +1,8 @@ +from marshmallow import Schema, fields + +from schema.gps_schema import GpsSchema + + +class ParkingSchema(Schema): + gps = fields.Nested(GpsSchema) + empty_count = fields.Int()