diff --git a/agent/src/file_datasource.py b/agent/src/file_datasource.py index 54aec29..145188b 100644 --- a/agent/src/file_datasource.py +++ b/agent/src/file_datasource.py @@ -1,5 +1,11 @@ -from csv import reader +from __future__ import annotations + +import csv +import time from datetime import datetime +from pathlib import Path +from typing import Optional, List + from domain.accelerometer import Accelerometer from domain.gps import Gps from domain.aggregated_data import AggregatedData @@ -7,24 +13,213 @@ import config class FileDatasource: - def __init__( - self, - accelerometer_filename: str, - gps_filename: str, - ) -> None: - pass - def read(self) -> AggregatedData: - """Метод повертає дані отримані з датчиків""" - return AggregatedData( - Accelerometer(1, 2, 3), - Gps(4, 5), - datetime.now(), - config.USER_ID, - ) + def __init__(self, accelerometer_filename: str, gps_filename: str) -> None: + self.accelerometer_filename = accelerometer_filename + self.gps_filename = gps_filename + + self._acc_f = None + self._gps_f = None + self._acc_reader: Optional[csv.reader] = None + self._gps_reader: Optional[csv.reader] = None + + self._started = False + + # one-row buffers (supports CSVs with or without header) + self._acc_buf: Optional[List[str]] = None + self._gps_buf: Optional[List[str]] = None + + self._acc_has_header: Optional[bool] = None + self._gps_has_header: Optional[bool] = None def startReading(self, *args, **kwargs): - """Метод повинен викликатись перед початком читання даних""" + """Must be called before read()""" + if self._started: + return + + if not Path(self.accelerometer_filename).exists(): + raise FileNotFoundError(f"Accelerometer file not found: {self.accelerometer_filename}") + if not Path(self.gps_filename).exists(): + raise FileNotFoundError(f"GPS file not found: {self.gps_filename}") + + self._open_files() + self._started = True def stopReading(self, *args, **kwargs): - """Метод повинен викликатись для закінчення читання даних""" + """Must be called when finishing reading""" + self._close_files() + self._started = False + + def read(self) -> AggregatedData: + """Return one combined sample (acc + gps).""" + if not self._started: + raise RuntimeError("Datasource is not started. Call startReading() before read().") + + acc_row = self._next_acc_row() + gps_row = self._next_gps_row() + + acc = self._parse_acc(acc_row) + gps = self._parse_gps(gps_row) + + # IMPORTANT: timing belongs to datasource (not MQTT / main.py) + if config.DELAY and config.DELAY > 0: + time.sleep(float(config.DELAY)) + + return AggregatedData( + accelerometer=acc, + gps=gps, + time=datetime.utcnow(), + user_id=config.USER_ID, + ) + + # ---------------- internal ---------------- + + def _open_files(self) -> None: + self._close_files() + + self._acc_f = open(self.accelerometer_filename, "r", newline="", encoding="utf-8") + self._gps_f = open(self.gps_filename, "r", newline="", encoding="utf-8") + + self._acc_reader = csv.reader(self._acc_f) + self._gps_reader = csv.reader(self._gps_f) + + self._acc_buf = None + self._gps_buf = None + + self._acc_has_header, self._acc_buf = self._detect_header_and_buffer( + self._acc_reader, expected_cols=3, header_tokens=("x", "y", "z") + ) + self._gps_has_header, self._gps_buf = self._detect_header_and_buffer( + self._gps_reader, expected_cols=2, header_tokens=("longitude", "latitude") + ) + + def _close_files(self) -> None: + for f in (self._acc_f, self._gps_f): + try: + if f is not None: + f.close() + except Exception: + pass + + self._acc_f = None + self._gps_f = None + self._acc_reader = None + self._gps_reader = None + self._acc_buf = None + self._gps_buf = None + self._acc_has_header = None + self._gps_has_header = None + + def _rewind_acc(self) -> None: + if self._acc_f is None: + raise RuntimeError("Accelerometer file is not open.") + self._acc_f.seek(0) + self._acc_reader = csv.reader(self._acc_f) + self._acc_has_header, self._acc_buf = self._detect_header_and_buffer( + self._acc_reader, expected_cols=3, header_tokens=("x", "y", "z") + ) + + def _rewind_gps(self) -> None: + if self._gps_f is None: + raise RuntimeError("GPS file is not open.") + self._gps_f.seek(0) + self._gps_reader = csv.reader(self._gps_f) + self._gps_has_header, self._gps_buf = self._detect_header_and_buffer( + self._gps_reader, expected_cols=2, header_tokens=("longitude", "latitude") + ) + + def _next_acc_row(self) -> List[str]: + if self._acc_reader is None: + raise RuntimeError("Accelerometer reader is not initialized.") + + while True: + if self._acc_buf is not None: + row = self._acc_buf + self._acc_buf = None + else: + row = next(self._acc_reader, None) + + if row is None: + # EOF -> rewind & continue + self._rewind_acc() + continue + + row = [c.strip() for c in row] + if not row or not any(row): + continue + + return row + + def _next_gps_row(self) -> List[str]: + if self._gps_reader is None: + raise RuntimeError("GPS reader is not initialized.") + + while True: + if self._gps_buf is not None: + row = self._gps_buf + self._gps_buf = None + else: + row = next(self._gps_reader, None) + + if row is None: + # EOF -> rewind & continue + self._rewind_gps() + continue + + row = [c.strip() for c in row] + if not row or not any(row): + continue + + return row + + @staticmethod + def _detect_header_and_buffer( + rdr: csv.reader, expected_cols: int, header_tokens: tuple[str, ...] + ) -> tuple[bool, Optional[List[str]]]: + + first = None + while True: + first = next(rdr, None) + if first is None: + return False, None + first = [c.strip() for c in first] + if first and any(first): + break + + norm = [c.lower() for c in first] + + # Header if it contains the expected tokens + if all(tok in norm for tok in header_tokens): + return True, None + + # If first row is numeric-like and has enough columns => it's data (buffer it back) + if len(norm) >= expected_cols and all(FileDatasource._is_number(x) for x in norm[:expected_cols]): + return False, first + + # Otherwise treat it as header-ish (skip it) + return True, None + + @staticmethod + def _parse_acc(row: List[str]) -> Accelerometer: + if len(row) < 3: + raise ValueError(f"Accelerometer row must have 3 values (x,y,z). Got: {row}") + x = int(float(row[0])) + y = int(float(row[1])) + z = int(float(row[2])) + return Accelerometer(x=x, y=y, z=z) + + @staticmethod + def _parse_gps(row: List[str]) -> Gps: + if len(row) < 2: + raise ValueError(f"GPS row must have 2 values (longitude,latitude). Got: {row}") + lon = float(row[0]) + lat = float(row[1]) + return Gps(longitude=lon, latitude=lat) + + @staticmethod + def _is_number(s: str) -> bool: + try: + float(s) + return True + except Exception: + return False \ No newline at end of file diff --git a/agent/src/main.py b/agent/src/main.py index 4ab4539..9eeb9a4 100644 --- a/agent/src/main.py +++ b/agent/src/main.py @@ -1,6 +1,4 @@ from paho.mqtt import client as mqtt_client -import json -import time from schema.aggregated_data_schema import AggregatedDataSchema from file_datasource import FileDatasource import config @@ -24,23 +22,18 @@ def connect_mqtt(broker, port): return client -def publish(client, topic, datasource, delay, max_sends = None): +def publish(client, topic, datasource, max_sends = None): datasource.startReading() i = 0 while True: i += 1 - time.sleep(delay) data = datasource.read() msg = AggregatedDataSchema().dumps(data) result = client.publish(topic, msg) - # result: [0, 1] status = result[0] - if status == 0: - pass - # print(f"Send `{msg}` to topic `{topic}`") - else: + if status != 0: print(f"Failed to send message to topic {topic}") if max_sends and i >= max_sends: @@ -52,9 +45,9 @@ def run(): # Prepare mqtt client client = connect_mqtt(config.MQTT_BROKER_HOST, config.MQTT_BROKER_PORT) # Prepare datasource - datasource = FileDatasource("data/data.csv", "data/gps_data.csv") + datasource = FileDatasource("data/accelerometer.csv", "data/gps.csv") # Infinity publish data - publish(client, config.MQTT_TOPIC, datasource, config.DELAY, getattr(config, "MAX_SENDS", None)) + publish(client, config.MQTT_TOPIC, datasource, getattr(config, "MAX_SENDS", None)) if __name__ == "__main__":