sem8-iot-test/agent/src/file_datasource.py

151 lines
4.9 KiB
Python
Raw Normal View History

import csv
import time
2024-02-02 22:51:54 +02:00
from datetime import datetime
from pathlib import Path
from typing import Optional, List
2024-02-16 15:34:20 +02:00
from domain.accelerometer import Accelerometer
from domain.gps import Gps
2024-02-02 22:51:54 +02:00
from domain.aggregated_data import AggregatedData
2024-02-16 15:34:20 +02:00
import config
2024-02-02 22:51:54 +02:00
class FileDatasource:
def __init__(self, accelerometer_filename: str, gps_filename: str) -> None:
self.accelerometer_filename = accelerometer_filename
self.gps_filename = gps_filename
self._acc_f = None
self._gps_f = None
self._acc_reader: Optional[csv.reader] = None
self._gps_reader: Optional[csv.reader] = None
self._started = False
def startReading(self, *args, **kwargs):
"""Must be called before read()"""
if self._started:
return
if not Path(self.accelerometer_filename).exists():
raise FileNotFoundError(f"Accelerometer file not found: {self.accelerometer_filename}")
if not Path(self.gps_filename).exists():
raise FileNotFoundError(f"GPS file not found: {self.gps_filename}")
self._open_files()
self._started = True
def stopReading(self, *args, **kwargs):
"""Must be called when finishing reading"""
self._close_files()
self._started = False
2024-02-02 22:51:54 +02:00
def read(self) -> AggregatedData:
"""Return one combined sample (acc + gps)."""
if not self._started:
raise RuntimeError("Datasource is not started. Call startReading() before read().")
acc_row = self._get_next_row(self._acc_reader, source="acc")
gps_row = self._get_next_row(self._gps_reader, source="gps")
acc = self._parse_acc(acc_row)
gps = self._parse_gps(gps_row)
# IMPORTANT: timing belongs to datasource (not MQTT / main.py)
if config.DELAY and config.DELAY > 0:
time.sleep(float(config.DELAY))
2024-02-16 15:34:20 +02:00
return AggregatedData(
accelerometer=acc,
gps=gps,
2026-02-24 14:21:41 +02:00
timestamp=datetime.utcnow(),
user_id=config.USER_ID,
2024-02-16 15:34:20 +02:00
)
2024-02-02 22:51:54 +02:00
# ---------------- internal ----------------
2024-02-02 22:51:54 +02:00
def _open_files(self) -> None:
self._close_files()
self._acc_f = open(self.accelerometer_filename, "r", newline="", encoding="utf-8")
self._gps_f = open(self.gps_filename, "r", newline="", encoding="utf-8")
self._acc_reader = csv.reader(self._acc_f, skipinitialspace=True)
self._gps_reader = csv.reader(self._gps_f, skipinitialspace=True)
# File pointer is already at 0 right after open(), so no need to rewind here.
# Skip header row once.
next(self._acc_reader, None)
next(self._gps_reader, None)
def _close_files(self) -> None:
for f in (self._acc_f, self._gps_f):
try:
if f is not None:
f.close()
except Exception:
pass
self._acc_f = None
self._gps_f = None
self._acc_reader = None
self._gps_reader = None
def _rewind_acc(self) -> None:
if self._acc_f is None:
raise RuntimeError("Accelerometer file is not open.")
self._acc_f.seek(0)
self._acc_reader = csv.reader(self._acc_f, skipinitialspace=True)
next(self._acc_reader, None) # skip header row
def _rewind_gps(self) -> None:
if self._gps_f is None:
raise RuntimeError("GPS file is not open.")
self._gps_f.seek(0)
self._gps_reader = csv.reader(self._gps_f, skipinitialspace=True)
next(self._gps_reader, None) # skip header row
def _get_next_row(self, reader, source: str) -> List[str]:
"""Get the next valid row from the reader."""
if reader is None:
raise RuntimeError("Reader is not initialized.")
while True:
row = next(reader, None)
if row is None:
# EOF -> rewind & continue
if source == "acc":
self._rewind_acc()
reader = self._acc_reader
else:
self._rewind_gps()
reader = self._gps_reader
continue
if not row or not any(cell.strip() for cell in row):
continue
return row
@staticmethod
def _parse_acc(row: List[str]) -> Accelerometer:
if len(row) < 3:
raise ValueError(f"Accelerometer row must have 3 values (x,y,z). Got: {row}")
2026-02-24 19:55:50 +02:00
try:
x = int(row[0])
y = int(row[1])
z = int(row[2])
except ValueError as e:
raise ValueError(f"Invalid accelerometer values (expected integers): {row}") from e
2026-02-24 19:55:50 +02:00
return Accelerometer(x=x, y=y, z=z)
@staticmethod
def _parse_gps(row: List[str]) -> Gps:
if len(row) < 2:
raise ValueError(f"GPS row must have 2 values (longitude,latitude). Got: {row}")
lon = float(row[0])
lat = float(row[1])
return Gps(longitude=lon, latitude=lat)