import csv import time from datetime import datetime from pathlib import Path from typing import Optional, List from domain.parking import Parking from domain.accelerometer import Accelerometer from domain.gps import Gps from domain.aggregated_data import AggregatedData import config class FileDatasource: def __init__( self, accelerometer_filename: str, gps_filename: str, park_filename: str, ) -> None: self.accelerometer_filename = accelerometer_filename self.park_filename = park_filename self.gps_filename = gps_filename self._park_f = None self._acc_f = None self._gps_f = None self._park_reader: Optional[csv.reader] = None self._acc_reader: Optional[csv.reader] = None self._gps_reader: Optional[csv.reader] = None self._started = False def startReading(self, *args, **kwargs): """Must be called before read()""" if self._started: return if not Path(self.accelerometer_filename).exists(): raise FileNotFoundError(f"Accelerometer file not found: {self.accelerometer_filename}") if not Path(self.park_filename).exists(): raise FileNotFoundError(f"Accelerometer file not found: {self.park_filename}") if not Path(self.gps_filename).exists(): raise FileNotFoundError(f"GPS file not found: {self.gps_filename}") self._open_files() self._started = True def stopReading(self, *args, **kwargs): """Must be called when finishing reading""" self._close_files() self._started = False def read(self) -> AggregatedData: """Return one combined sample (acc + gps).""" if not self._started: raise RuntimeError("Datasource is not started. Call startReading() before read().") acc_row = self._get_next_row(self._acc_reader, source="acc") park_row = self._get_next_row(self._park_reader, source="park") gps_row = self._get_next_row(self._gps_reader, source="gps") acc = self._parse_acc(acc_row) park = self._parse_park(park_row) gps = self._parse_gps(gps_row) # IMPORTANT: timing belongs to datasource (not MQTT / main.py) if config.DELAY and config.DELAY > 0: time.sleep(float(config.DELAY)) return AggregatedData( accelerometer=acc, gps=gps, parking=park, timestamp=datetime.utcnow(), user_id=config.USER_ID, ) # ---------------- internal ---------------- def _open_files(self) -> None: self._close_files() self._acc_f = open(self.accelerometer_filename, "r", newline="", encoding="utf-8") self._park_f = open(self.park_filename, "r", newline="", encoding="utf-8") self._gps_f = open(self.gps_filename, "r", newline="", encoding="utf-8") self._acc_reader = csv.reader(self._acc_f, skipinitialspace=True) self._park_reader = csv.reader(self._park_f, skipinitialspace=True) self._gps_reader = csv.reader(self._gps_f, skipinitialspace=True) # File pointer is already at 0 right after open(), so no need to rewind here. # Skip header row once. next(self._acc_reader, None) next(self._park_reader, None) next(self._gps_reader, None) def _close_files(self) -> None: for f in (self._acc_f, self._gps_f): try: if f is not None: f.close() except Exception: pass self._acc_f = None self._park_f = None self._gps_f = None self._acc_reader = None self._park_reader = None self._gps_reader = None def _rewind_acc(self) -> None: if self._acc_f is None: raise RuntimeError("Accelerometer file is not open.") self._acc_f.seek(0) self._acc_reader = csv.reader(self._acc_f, skipinitialspace=True) next(self._acc_reader, None) # skip header row def _rewind_gps(self) -> None: if self._gps_f is None: raise RuntimeError("GPS file is not open.") self._gps_f.seek(0) self._gps_reader = csv.reader(self._gps_f, skipinitialspace=True) next(self._gps_reader, None) # skip header row def _rewind_park(self) -> None: if self._park_f is None: raise RuntimeError("GPS file is not open.") self._park_f.seek(0) self._park_reader = csv.reader(self._park_f, skipinitialspace=True) next(self._park_reader, None) # skip header row def _get_next_row(self, reader, source: str) -> List[str]: """Get the next valid row from the reader.""" if reader is None: raise RuntimeError("Reader is not initialized.") while True: row = next(reader, None) if row is None: # EOF -> rewind & continue if source == "acc": self._rewind_acc() reader = self._acc_reader elif source == 'park': self._rewind_park() reader = self._park_reader else: self._rewind_gps() reader = self._gps_reader continue if not row or not any(cell.strip() for cell in row): continue return row @staticmethod def _parse_acc(row: List[str]) -> Accelerometer: if len(row) < 3: raise ValueError(f"Accelerometer row must have 3 values (x,y,z). Got: {row}") try: x = int(row[0]) y = int(row[1]) z = int(row[2]) except ValueError as e: raise ValueError(f"Invalid accelerometer values (expected integers): {row}") from e return Accelerometer(x=x, y=y, z=z) @staticmethod def _parse_gps(row: List[str]) -> Gps: if len(row) < 2: raise ValueError(f"GPS row must have 2 values (longitude,latitude). Got: {row}") lon = float(row[0]) lat = float(row[1]) return Gps(longitude=lon, latitude=lat) @staticmethod def _parse_park(row: List[str]) -> Parking: if len(row) < 2: raise ValueError(f"GPS row must have 2 values (longitude,latitude). Got: {row}") lon = float(row[0]) lat = float(row[1]) empty_count = int(row[2]) return Parking( gps=Gps(longitude=lon, latitude=lat), empty_count=empty_count )