diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..24a7bbe --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +agent/docker/mosquitto/data/ +agent/docker/mosquitto/log/ \ No newline at end of file diff --git a/agent/src/file_datasource.py b/agent/src/file_datasource.py index 54aec29..363f1df 100644 --- a/agent/src/file_datasource.py +++ b/agent/src/file_datasource.py @@ -1,5 +1,9 @@ -from csv import reader +import csv +import time from datetime import datetime +from pathlib import Path +from typing import Optional, List + from domain.accelerometer import Accelerometer from domain.gps import Gps from domain.aggregated_data import AggregatedData @@ -7,24 +11,141 @@ import config class FileDatasource: - def __init__( - self, - accelerometer_filename: str, - gps_filename: str, - ) -> None: - pass - def read(self) -> AggregatedData: - """Метод повертає дані отримані з датчиків""" - return AggregatedData( - Accelerometer(1, 2, 3), - Gps(4, 5), - datetime.now(), - config.USER_ID, - ) + def __init__(self, accelerometer_filename: str, gps_filename: str) -> None: + self.accelerometer_filename = accelerometer_filename + self.gps_filename = gps_filename + + self._acc_f = None + self._gps_f = None + self._acc_reader: Optional[csv.reader] = None + self._gps_reader: Optional[csv.reader] = None + + self._started = False def startReading(self, *args, **kwargs): - """Метод повинен викликатись перед початком читання даних""" + """Must be called before read()""" + if self._started: + return + + if not Path(self.accelerometer_filename).exists(): + raise FileNotFoundError(f"Accelerometer file not found: {self.accelerometer_filename}") + if not Path(self.gps_filename).exists(): + raise FileNotFoundError(f"GPS file not found: {self.gps_filename}") + + self._open_files() + self._started = True def stopReading(self, *args, **kwargs): - """Метод повинен викликатись для закінчення читання даних""" + """Must be called when finishing reading""" + self._close_files() + self._started = False + + def read(self) -> AggregatedData: + """Return one combined sample (acc + gps).""" + if not self._started: + raise RuntimeError("Datasource is not started. Call startReading() before read().") + + acc_row = self._get_next_row(self._acc_reader, source="acc") + gps_row = self._get_next_row(self._gps_reader, source="gps") + + acc = self._parse_acc(acc_row) + gps = self._parse_gps(gps_row) + + # IMPORTANT: timing belongs to datasource (not MQTT / main.py) + if config.DELAY and config.DELAY > 0: + time.sleep(float(config.DELAY)) + + return AggregatedData( + accelerometer=acc, + gps=gps, + timestamp=datetime.utcnow(), + user_id=config.USER_ID, + ) + + # ---------------- internal ---------------- + + def _open_files(self) -> None: + self._close_files() + + self._acc_f = open(self.accelerometer_filename, "r", newline="", encoding="utf-8") + self._gps_f = open(self.gps_filename, "r", newline="", encoding="utf-8") + + self._acc_reader = csv.reader(self._acc_f, skipinitialspace=True) + self._gps_reader = csv.reader(self._gps_f, skipinitialspace=True) + + # File pointer is already at 0 right after open(), so no need to rewind here. + # Skip header row once. + next(self._acc_reader, None) + next(self._gps_reader, None) + + def _close_files(self) -> None: + for f in (self._acc_f, self._gps_f): + try: + if f is not None: + f.close() + except Exception: + pass + + self._acc_f = None + self._gps_f = None + self._acc_reader = None + self._gps_reader = None + + def _rewind_acc(self) -> None: + if self._acc_f is None: + raise RuntimeError("Accelerometer file is not open.") + self._acc_f.seek(0) + self._acc_reader = csv.reader(self._acc_f, skipinitialspace=True) + next(self._acc_reader, None) # skip header row + + def _rewind_gps(self) -> None: + if self._gps_f is None: + raise RuntimeError("GPS file is not open.") + self._gps_f.seek(0) + self._gps_reader = csv.reader(self._gps_f, skipinitialspace=True) + next(self._gps_reader, None) # skip header row + + def _get_next_row(self, reader, source: str) -> List[str]: + """Get the next valid row from the reader.""" + if reader is None: + raise RuntimeError("Reader is not initialized.") + + while True: + row = next(reader, None) + if row is None: + # EOF -> rewind & continue + if source == "acc": + self._rewind_acc() + reader = self._acc_reader + else: + self._rewind_gps() + reader = self._gps_reader + continue + + if not row or not any(cell.strip() for cell in row): + continue + + return row + + @staticmethod + def _parse_acc(row: List[str]) -> Accelerometer: + if len(row) < 3: + raise ValueError(f"Accelerometer row must have 3 values (x,y,z). Got: {row}") + + try: + x = int(row[0]) + y = int(row[1]) + z = int(row[2]) + except ValueError as e: + raise ValueError(f"Invalid accelerometer values (expected integers): {row}") from e + + return Accelerometer(x=x, y=y, z=z) + + @staticmethod + def _parse_gps(row: List[str]) -> Gps: + if len(row) < 2: + raise ValueError(f"GPS row must have 2 values (longitude,latitude). Got: {row}") + lon = float(row[0]) + lat = float(row[1]) + return Gps(longitude=lon, latitude=lat) \ No newline at end of file diff --git a/agent/src/main.py b/agent/src/main.py index 52351bd..5513bf4 100644 --- a/agent/src/main.py +++ b/agent/src/main.py @@ -1,6 +1,4 @@ from paho.mqtt import client as mqtt_client -import json -import time from schema.aggregated_data_schema import AggregatedDataSchema from file_datasource import FileDatasource import config @@ -24,19 +22,14 @@ def connect_mqtt(broker, port): return client -def publish(client, topic, datasource, delay): +def publish(client, topic, datasource): datasource.startReading() while True: - time.sleep(delay) data = datasource.read() msg = AggregatedDataSchema().dumps(data) result = client.publish(topic, msg) - # result: [0, 1] status = result[0] - if status == 0: - pass - # print(f"Send `{msg}` to topic `{topic}`") - else: + if status != 0: print(f"Failed to send message to topic {topic}") @@ -44,10 +37,10 @@ def run(): # Prepare mqtt client client = connect_mqtt(config.MQTT_BROKER_HOST, config.MQTT_BROKER_PORT) # Prepare datasource - datasource = FileDatasource("data/data.csv", "data/gps_data.csv") + datasource = FileDatasource("data/accelerometer.csv", "data/gps.csv") # Infinity publish data - publish(client, config.MQTT_TOPIC, datasource, config.DELAY) + publish(client, config.MQTT_TOPIC, datasource) if __name__ == "__main__": - run() + run() \ No newline at end of file