From c974ac32f64c8b0b8e3aeb4efc65bede3748fde4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9E=D0=BB=D0=B5=D0=BA=D1=81=D0=B0=D0=BD=D0=B4=D1=80=20?= =?UTF-8?q?=D0=93=D1=83=D1=80=D0=B0=D0=BD=D0=B5=D1=86=D1=8C?= Date: Mon, 23 Feb 2026 22:01:11 +0200 Subject: [PATCH 01/15] =?UTF-8?q?=D0=A0=D0=B5=D0=B0=D0=BB=D1=96=D0=B7?= =?UTF-8?q?=D0=BE=D0=B2=D1=83=D1=8E=20=D0=B1=D0=B0=D0=B7=D0=BE=D0=B2=D0=B8?= =?UTF-8?q?=D0=B9=20FileReader=20=D1=82=D0=B0=20=D0=BF=D0=B5=D1=80=D0=B5?= =?UTF-8?q?=D0=BD=D0=BE=D1=88=D1=83=20sleep=20=D0=B4=D0=BE=20FileDatasourc?= =?UTF-8?q?e.read()?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- agent/src/file_datasource.py | 229 ++++++++++++++++++++++++++++++++--- agent/src/main.py | 17 +-- 2 files changed, 217 insertions(+), 29 deletions(-) diff --git a/agent/src/file_datasource.py b/agent/src/file_datasource.py index 54aec29..145188b 100644 --- a/agent/src/file_datasource.py +++ b/agent/src/file_datasource.py @@ -1,5 +1,11 @@ -from csv import reader +from __future__ import annotations + +import csv +import time from datetime import datetime +from pathlib import Path +from typing import Optional, List + from domain.accelerometer import Accelerometer from domain.gps import Gps from domain.aggregated_data import AggregatedData @@ -7,24 +13,213 @@ import config class FileDatasource: - def __init__( - self, - accelerometer_filename: str, - gps_filename: str, - ) -> None: - pass - def read(self) -> AggregatedData: - """Метод повертає дані отримані з датчиків""" - return AggregatedData( - Accelerometer(1, 2, 3), - Gps(4, 5), - datetime.now(), - config.USER_ID, - ) + def __init__(self, accelerometer_filename: str, gps_filename: str) -> None: + self.accelerometer_filename = accelerometer_filename + self.gps_filename = gps_filename + + self._acc_f = None + self._gps_f = None + self._acc_reader: Optional[csv.reader] = None + self._gps_reader: Optional[csv.reader] = None + + self._started = False + + # one-row buffers (supports CSVs with or without header) + self._acc_buf: Optional[List[str]] = None + self._gps_buf: Optional[List[str]] = None + + self._acc_has_header: Optional[bool] = None + self._gps_has_header: Optional[bool] = None def startReading(self, *args, **kwargs): - """Метод повинен викликатись перед початком читання даних""" + """Must be called before read()""" + if self._started: + return + + if not Path(self.accelerometer_filename).exists(): + raise FileNotFoundError(f"Accelerometer file not found: {self.accelerometer_filename}") + if not Path(self.gps_filename).exists(): + raise FileNotFoundError(f"GPS file not found: {self.gps_filename}") + + self._open_files() + self._started = True def stopReading(self, *args, **kwargs): - """Метод повинен викликатись для закінчення читання даних""" + """Must be called when finishing reading""" + self._close_files() + self._started = False + + def read(self) -> AggregatedData: + """Return one combined sample (acc + gps).""" + if not self._started: + raise RuntimeError("Datasource is not started. Call startReading() before read().") + + acc_row = self._next_acc_row() + gps_row = self._next_gps_row() + + acc = self._parse_acc(acc_row) + gps = self._parse_gps(gps_row) + + # IMPORTANT: timing belongs to datasource (not MQTT / main.py) + if config.DELAY and config.DELAY > 0: + time.sleep(float(config.DELAY)) + + return AggregatedData( + accelerometer=acc, + gps=gps, + time=datetime.utcnow(), + user_id=config.USER_ID, + ) + + # ---------------- internal ---------------- + + def _open_files(self) -> None: + self._close_files() + + self._acc_f = open(self.accelerometer_filename, "r", newline="", encoding="utf-8") + self._gps_f = open(self.gps_filename, "r", newline="", encoding="utf-8") + + self._acc_reader = csv.reader(self._acc_f) + self._gps_reader = csv.reader(self._gps_f) + + self._acc_buf = None + self._gps_buf = None + + self._acc_has_header, self._acc_buf = self._detect_header_and_buffer( + self._acc_reader, expected_cols=3, header_tokens=("x", "y", "z") + ) + self._gps_has_header, self._gps_buf = self._detect_header_and_buffer( + self._gps_reader, expected_cols=2, header_tokens=("longitude", "latitude") + ) + + def _close_files(self) -> None: + for f in (self._acc_f, self._gps_f): + try: + if f is not None: + f.close() + except Exception: + pass + + self._acc_f = None + self._gps_f = None + self._acc_reader = None + self._gps_reader = None + self._acc_buf = None + self._gps_buf = None + self._acc_has_header = None + self._gps_has_header = None + + def _rewind_acc(self) -> None: + if self._acc_f is None: + raise RuntimeError("Accelerometer file is not open.") + self._acc_f.seek(0) + self._acc_reader = csv.reader(self._acc_f) + self._acc_has_header, self._acc_buf = self._detect_header_and_buffer( + self._acc_reader, expected_cols=3, header_tokens=("x", "y", "z") + ) + + def _rewind_gps(self) -> None: + if self._gps_f is None: + raise RuntimeError("GPS file is not open.") + self._gps_f.seek(0) + self._gps_reader = csv.reader(self._gps_f) + self._gps_has_header, self._gps_buf = self._detect_header_and_buffer( + self._gps_reader, expected_cols=2, header_tokens=("longitude", "latitude") + ) + + def _next_acc_row(self) -> List[str]: + if self._acc_reader is None: + raise RuntimeError("Accelerometer reader is not initialized.") + + while True: + if self._acc_buf is not None: + row = self._acc_buf + self._acc_buf = None + else: + row = next(self._acc_reader, None) + + if row is None: + # EOF -> rewind & continue + self._rewind_acc() + continue + + row = [c.strip() for c in row] + if not row or not any(row): + continue + + return row + + def _next_gps_row(self) -> List[str]: + if self._gps_reader is None: + raise RuntimeError("GPS reader is not initialized.") + + while True: + if self._gps_buf is not None: + row = self._gps_buf + self._gps_buf = None + else: + row = next(self._gps_reader, None) + + if row is None: + # EOF -> rewind & continue + self._rewind_gps() + continue + + row = [c.strip() for c in row] + if not row or not any(row): + continue + + return row + + @staticmethod + def _detect_header_and_buffer( + rdr: csv.reader, expected_cols: int, header_tokens: tuple[str, ...] + ) -> tuple[bool, Optional[List[str]]]: + + first = None + while True: + first = next(rdr, None) + if first is None: + return False, None + first = [c.strip() for c in first] + if first and any(first): + break + + norm = [c.lower() for c in first] + + # Header if it contains the expected tokens + if all(tok in norm for tok in header_tokens): + return True, None + + # If first row is numeric-like and has enough columns => it's data (buffer it back) + if len(norm) >= expected_cols and all(FileDatasource._is_number(x) for x in norm[:expected_cols]): + return False, first + + # Otherwise treat it as header-ish (skip it) + return True, None + + @staticmethod + def _parse_acc(row: List[str]) -> Accelerometer: + if len(row) < 3: + raise ValueError(f"Accelerometer row must have 3 values (x,y,z). Got: {row}") + x = int(float(row[0])) + y = int(float(row[1])) + z = int(float(row[2])) + return Accelerometer(x=x, y=y, z=z) + + @staticmethod + def _parse_gps(row: List[str]) -> Gps: + if len(row) < 2: + raise ValueError(f"GPS row must have 2 values (longitude,latitude). Got: {row}") + lon = float(row[0]) + lat = float(row[1]) + return Gps(longitude=lon, latitude=lat) + + @staticmethod + def _is_number(s: str) -> bool: + try: + float(s) + return True + except Exception: + return False \ No newline at end of file diff --git a/agent/src/main.py b/agent/src/main.py index 52351bd..5513bf4 100644 --- a/agent/src/main.py +++ b/agent/src/main.py @@ -1,6 +1,4 @@ from paho.mqtt import client as mqtt_client -import json -import time from schema.aggregated_data_schema import AggregatedDataSchema from file_datasource import FileDatasource import config @@ -24,19 +22,14 @@ def connect_mqtt(broker, port): return client -def publish(client, topic, datasource, delay): +def publish(client, topic, datasource): datasource.startReading() while True: - time.sleep(delay) data = datasource.read() msg = AggregatedDataSchema().dumps(data) result = client.publish(topic, msg) - # result: [0, 1] status = result[0] - if status == 0: - pass - # print(f"Send `{msg}` to topic `{topic}`") - else: + if status != 0: print(f"Failed to send message to topic {topic}") @@ -44,10 +37,10 @@ def run(): # Prepare mqtt client client = connect_mqtt(config.MQTT_BROKER_HOST, config.MQTT_BROKER_PORT) # Prepare datasource - datasource = FileDatasource("data/data.csv", "data/gps_data.csv") + datasource = FileDatasource("data/accelerometer.csv", "data/gps.csv") # Infinity publish data - publish(client, config.MQTT_TOPIC, datasource, config.DELAY) + publish(client, config.MQTT_TOPIC, datasource) if __name__ == "__main__": - run() + run() \ No newline at end of file From 07a0e906d8e5e249db49e79593810dd0a1e42661 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9E=D0=BB=D0=B5=D0=BA=D1=81=D0=B0=D0=BD=D0=B4=D1=80=20?= =?UTF-8?q?=D0=93=D1=83=D1=80=D0=B0=D0=BD=D0=B5=D1=86=D1=8C?= Date: Tue, 24 Feb 2026 14:21:41 +0200 Subject: [PATCH 02/15] Fix timestamp field in AggregatedData --- agent/src/file_datasource.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agent/src/file_datasource.py b/agent/src/file_datasource.py index 145188b..8781d80 100644 --- a/agent/src/file_datasource.py +++ b/agent/src/file_datasource.py @@ -68,7 +68,7 @@ class FileDatasource: return AggregatedData( accelerometer=acc, gps=gps, - time=datetime.utcnow(), + timestamp=datetime.utcnow(), user_id=config.USER_ID, ) From c5d98d53cd87c1355c57295041f238cab94796f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9E=D0=BB=D0=B5=D0=BA=D1=81=D0=B0=D0=BD=D0=B4=D1=80=20?= =?UTF-8?q?=D0=93=D1=83=D1=80=D0=B0=D0=BD=D0=B5=D1=86=D1=8C?= Date: Tue, 24 Feb 2026 14:25:27 +0200 Subject: [PATCH 03/15] Add mosquitto runtime folders to gitignore --- .gitignore | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..24a7bbe --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +agent/docker/mosquitto/data/ +agent/docker/mosquitto/log/ \ No newline at end of file From 3d94bf30083f6df047cf9ec6d9a9739397db1947 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9E=D0=BB=D0=B5=D0=BA=D1=81=D0=B0=D0=BD=D0=B4=D1=80=20?= =?UTF-8?q?=D0=93=D1=83=D1=80=D0=B0=D0=BD=D0=B5=D1=86=D1=8C?= Date: Tue, 24 Feb 2026 15:12:49 +0200 Subject: [PATCH 04/15] Parse accelerometer values as int16 (remove float conversion) --- agent/src/file_datasource.py | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/agent/src/file_datasource.py b/agent/src/file_datasource.py index 8781d80..00becfb 100644 --- a/agent/src/file_datasource.py +++ b/agent/src/file_datasource.py @@ -14,6 +14,9 @@ import config class FileDatasource: + INT16_MIN = -32768 + INT16_MAX = 32767 + def __init__(self, accelerometer_filename: str, gps_filename: str) -> None: self.accelerometer_filename = accelerometer_filename self.gps_filename = gps_filename @@ -199,13 +202,24 @@ class FileDatasource: # Otherwise treat it as header-ish (skip it) return True, None + @staticmethod + def _parse_int16(s: str) -> int: + """ + Parse a signed 16-bit integer from string representation. + The file is expected to contain integer literals (e.g. "123", "-42"). + """ + v = int(s) + if not FileDatasource.INT16_MIN <= v <= FileDatasource.INT16_MAX: + raise ValueError(f"Value {v} is out of signed 16-bit range") + return v + @staticmethod def _parse_acc(row: List[str]) -> Accelerometer: if len(row) < 3: raise ValueError(f"Accelerometer row must have 3 values (x,y,z). Got: {row}") - x = int(float(row[0])) - y = int(float(row[1])) - z = int(float(row[2])) + x = FileDatasource._parse_int16(row[0]) + y = FileDatasource._parse_int16(row[1]) + z = FileDatasource._parse_int16(row[2]) return Accelerometer(x=x, y=y, z=z) @staticmethod From 16437670941c4dea346eb833911f2cfa750b7b56 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9E=D0=BB=D0=B5=D0=BA=D1=81=D0=B0=D0=BD=D0=B4=D1=80=20?= =?UTF-8?q?=D0=93=D1=83=D1=80=D0=B0=D0=BD=D0=B5=D1=86=D1=8C?= Date: Tue, 24 Feb 2026 15:41:41 +0200 Subject: [PATCH 05/15] Remove int16 range check per review --- agent/src/file_datasource.py | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/agent/src/file_datasource.py b/agent/src/file_datasource.py index 00becfb..5ce6e11 100644 --- a/agent/src/file_datasource.py +++ b/agent/src/file_datasource.py @@ -204,14 +204,7 @@ class FileDatasource: @staticmethod def _parse_int16(s: str) -> int: - """ - Parse a signed 16-bit integer from string representation. - The file is expected to contain integer literals (e.g. "123", "-42"). - """ - v = int(s) - if not FileDatasource.INT16_MIN <= v <= FileDatasource.INT16_MAX: - raise ValueError(f"Value {v} is out of signed 16-bit range") - return v + return int(s) @staticmethod def _parse_acc(row: List[str]) -> Accelerometer: From ca790e73066088f1953cb79ffa176863588b9fec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9E=D0=BB=D0=B5=D0=BA=D1=81=D0=B0=D0=BD=D0=B4=D1=80=20?= =?UTF-8?q?=D0=93=D1=83=D1=80=D0=B0=D0=BD=D0=B5=D1=86=D1=8C?= Date: Tue, 24 Feb 2026 19:55:50 +0200 Subject: [PATCH 06/15] Remove int16 binding from datasource --- agent/src/file_datasource.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/agent/src/file_datasource.py b/agent/src/file_datasource.py index 5ce6e11..ca1cc1c 100644 --- a/agent/src/file_datasource.py +++ b/agent/src/file_datasource.py @@ -14,8 +14,6 @@ import config class FileDatasource: - INT16_MIN = -32768 - INT16_MAX = 32767 def __init__(self, accelerometer_filename: str, gps_filename: str) -> None: self.accelerometer_filename = accelerometer_filename @@ -203,16 +201,21 @@ class FileDatasource: return True, None @staticmethod - def _parse_int16(s: str) -> int: + def _parse_int(s: str) -> int: return int(s) @staticmethod def _parse_acc(row: List[str]) -> Accelerometer: if len(row) < 3: raise ValueError(f"Accelerometer row must have 3 values (x,y,z). Got: {row}") - x = FileDatasource._parse_int16(row[0]) - y = FileDatasource._parse_int16(row[1]) - z = FileDatasource._parse_int16(row[2]) + + try: + x = int(row[0]) + y = int(row[1]) + z = int(row[2]) + except ValueError as e: + raise ValueError(f"Invalid accelerometer values (expected integers): {row}") from e + return Accelerometer(x=x, y=y, z=z) @staticmethod From a25fbfc3ef6f1bd0b5aed8cea311ab5b329ad4ed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9E=D0=BB=D0=B5=D0=BA=D1=81=D0=B0=D0=BD=D0=B4=D1=80=20?= =?UTF-8?q?=D0=93=D1=83=D1=80=D0=B0=D0=BD=D0=B5=D1=86=D1=8C?= Date: Tue, 24 Feb 2026 20:14:56 +0200 Subject: [PATCH 07/15] Simplify accelerometer parsing and remove int16-specific leftovers --- agent/src/file_datasource.py | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/agent/src/file_datasource.py b/agent/src/file_datasource.py index ca1cc1c..7bf7a45 100644 --- a/agent/src/file_datasource.py +++ b/agent/src/file_datasource.py @@ -14,7 +14,6 @@ import config class FileDatasource: - def __init__(self, accelerometer_filename: str, gps_filename: str) -> None: self.accelerometer_filename = accelerometer_filename self.gps_filename = gps_filename @@ -200,21 +199,14 @@ class FileDatasource: # Otherwise treat it as header-ish (skip it) return True, None - @staticmethod - def _parse_int(s: str) -> int: - return int(s) - @staticmethod def _parse_acc(row: List[str]) -> Accelerometer: if len(row) < 3: raise ValueError(f"Accelerometer row must have 3 values (x,y,z). Got: {row}") - try: - x = int(row[0]) - y = int(row[1]) - z = int(row[2]) - except ValueError as e: - raise ValueError(f"Invalid accelerometer values (expected integers): {row}") from e + x = int(row[0]) + y = int(row[1]) + z = int(row[2]) return Accelerometer(x=x, y=y, z=z) From 75613fd4fc5796b4f67e59da0c958a3f099fc91b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9E=D0=BB=D0=B5=D0=BA=D1=81=D0=B0=D0=BD=D0=B4=D1=80=20?= =?UTF-8?q?=D0=93=D1=83=D1=80=D0=B0=D0=BD=D0=B5=D1=86=D1=8C?= Date: Tue, 24 Feb 2026 20:47:00 +0200 Subject: [PATCH 08/15] Restore input validation for accelerometer parsing --- agent/src/file_datasource.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/agent/src/file_datasource.py b/agent/src/file_datasource.py index 7bf7a45..4e359a3 100644 --- a/agent/src/file_datasource.py +++ b/agent/src/file_datasource.py @@ -204,9 +204,14 @@ class FileDatasource: if len(row) < 3: raise ValueError(f"Accelerometer row must have 3 values (x,y,z). Got: {row}") - x = int(row[0]) - y = int(row[1]) - z = int(row[2]) + try: + x = int(row[0]) + y = int(row[1]) + z = int(row[2]) + except ValueError as e: + raise ValueError( + f"Invalid accelerometer values (expected integers): {row}" + ) from e return Accelerometer(x=x, y=y, z=z) From 3e0b4762ef467c2c282aad3131a90eb146d9d5ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9E=D0=BB=D0=B5=D0=BA=D1=81=D0=B0=D0=BD=D0=B4=D1=80=20?= =?UTF-8?q?=D0=93=D1=83=D1=80=D0=B0=D0=BD=D0=B5=D1=86=D1=8C?= Date: Tue, 24 Feb 2026 22:11:37 +0200 Subject: [PATCH 09/15] Optimize CSV parsing by adding skipinitialspace=True to csv.reader and removing unnecessary strip() calls --- agent/src/file_datasource.py | 31 ++++++++++--------------------- 1 file changed, 10 insertions(+), 21 deletions(-) diff --git a/agent/src/file_datasource.py b/agent/src/file_datasource.py index 4e359a3..ed6937d 100644 --- a/agent/src/file_datasource.py +++ b/agent/src/file_datasource.py @@ -1,5 +1,3 @@ -from __future__ import annotations - import csv import time from datetime import datetime @@ -29,9 +27,6 @@ class FileDatasource: self._acc_buf: Optional[List[str]] = None self._gps_buf: Optional[List[str]] = None - self._acc_has_header: Optional[bool] = None - self._gps_has_header: Optional[bool] = None - def startReading(self, *args, **kwargs): """Must be called before read()""" if self._started: @@ -80,16 +75,17 @@ class FileDatasource: self._acc_f = open(self.accelerometer_filename, "r", newline="", encoding="utf-8") self._gps_f = open(self.gps_filename, "r", newline="", encoding="utf-8") - self._acc_reader = csv.reader(self._acc_f) - self._gps_reader = csv.reader(self._gps_f) + self._acc_reader = csv.reader(self._acc_f, skipinitialspace=True) + self._gps_reader = csv.reader(self._gps_f, skipinitialspace=True) self._acc_buf = None self._gps_buf = None - self._acc_has_header, self._acc_buf = self._detect_header_and_buffer( + # detect header / buffer first data row (we only need the buffered row) + _, self._acc_buf = self._detect_header_and_buffer( self._acc_reader, expected_cols=3, header_tokens=("x", "y", "z") ) - self._gps_has_header, self._gps_buf = self._detect_header_and_buffer( + _, self._gps_buf = self._detect_header_and_buffer( self._gps_reader, expected_cols=2, header_tokens=("longitude", "latitude") ) @@ -107,15 +103,13 @@ class FileDatasource: self._gps_reader = None self._acc_buf = None self._gps_buf = None - self._acc_has_header = None - self._gps_has_header = None def _rewind_acc(self) -> None: if self._acc_f is None: raise RuntimeError("Accelerometer file is not open.") self._acc_f.seek(0) - self._acc_reader = csv.reader(self._acc_f) - self._acc_has_header, self._acc_buf = self._detect_header_and_buffer( + self._acc_reader = csv.reader(self._acc_f, skipinitialspace=True) + _, self._acc_buf = self._detect_header_and_buffer( self._acc_reader, expected_cols=3, header_tokens=("x", "y", "z") ) @@ -123,8 +117,8 @@ class FileDatasource: if self._gps_f is None: raise RuntimeError("GPS file is not open.") self._gps_f.seek(0) - self._gps_reader = csv.reader(self._gps_f) - self._gps_has_header, self._gps_buf = self._detect_header_and_buffer( + self._gps_reader = csv.reader(self._gps_f, skipinitialspace=True) + _, self._gps_buf = self._detect_header_and_buffer( self._gps_reader, expected_cols=2, header_tokens=("longitude", "latitude") ) @@ -144,7 +138,6 @@ class FileDatasource: self._rewind_acc() continue - row = [c.strip() for c in row] if not row or not any(row): continue @@ -166,7 +159,6 @@ class FileDatasource: self._rewind_gps() continue - row = [c.strip() for c in row] if not row or not any(row): continue @@ -182,7 +174,6 @@ class FileDatasource: first = next(rdr, None) if first is None: return False, None - first = [c.strip() for c in first] if first and any(first): break @@ -209,9 +200,7 @@ class FileDatasource: y = int(row[1]) z = int(row[2]) except ValueError as e: - raise ValueError( - f"Invalid accelerometer values (expected integers): {row}" - ) from e + raise ValueError(f"Invalid accelerometer values (expected integers): {row}") from e return Accelerometer(x=x, y=y, z=z) From fe66df9b8caf92eabc13dd1e30a7152d5cea1118 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9E=D0=BB=D0=B5=D0=BA=D1=81=D0=B0=D0=BD=D0=B4=D1=80=20?= =?UTF-8?q?=D0=93=D1=83=D1=80=D0=B0=D0=BD=D0=B5=D1=86=D1=8C?= Date: Wed, 25 Feb 2026 11:50:56 +0200 Subject: [PATCH 10/15] Refactor row reading logic for clarity and efficiency --- agent/src/file_datasource.py | 46 ++++++++++-------------------------- 1 file changed, 13 insertions(+), 33 deletions(-) diff --git a/agent/src/file_datasource.py b/agent/src/file_datasource.py index ed6937d..fe58e16 100644 --- a/agent/src/file_datasource.py +++ b/agent/src/file_datasource.py @@ -50,8 +50,8 @@ class FileDatasource: if not self._started: raise RuntimeError("Datasource is not started. Call startReading() before read().") - acc_row = self._next_acc_row() - gps_row = self._next_gps_row() + acc_row = self._get_next_row(self._acc_reader, self._acc_buf) + gps_row = self._get_next_row(self._gps_reader, self._gps_buf) acc = self._parse_acc(acc_row) gps = self._parse_gps(gps_row) @@ -122,44 +122,24 @@ class FileDatasource: self._gps_reader, expected_cols=2, header_tokens=("longitude", "latitude") ) - def _next_acc_row(self) -> List[str]: - if self._acc_reader is None: - raise RuntimeError("Accelerometer reader is not initialized.") + def _get_next_row(self, reader, buffer) -> List[str]: + """Get the next valid row from the reader or buffer.""" + if reader is None: + raise RuntimeError("Reader is not initialized.") while True: - if self._acc_buf is not None: - row = self._acc_buf - self._acc_buf = None + if buffer is not None: + row = buffer + buffer = None else: - row = next(self._acc_reader, None) + row = next(reader, None) if row is None: # EOF -> rewind & continue - self._rewind_acc() + self._rewind_acc() if reader == self._acc_reader else self._rewind_gps() continue - if not row or not any(row): - continue - - return row - - def _next_gps_row(self) -> List[str]: - if self._gps_reader is None: - raise RuntimeError("GPS reader is not initialized.") - - while True: - if self._gps_buf is not None: - row = self._gps_buf - self._gps_buf = None - else: - row = next(self._gps_reader, None) - - if row is None: - # EOF -> rewind & continue - self._rewind_gps() - continue - - if not row or not any(row): + if not row or not any(cell.strip() for cell in row): continue return row @@ -177,7 +157,7 @@ class FileDatasource: if first and any(first): break - norm = [c.lower() for c in first] + norm = [c.strip().lower() for c in first] # Header if it contains the expected tokens if all(tok in norm for tok in header_tokens): From e4be6b0a194b4c649c7662e6149e5b4458ee7cef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9E=D0=BB=D0=B5=D0=BA=D1=81=D0=B0=D0=BD=D0=B4=D1=80=20?= =?UTF-8?q?=D0=93=D1=83=D1=80=D0=B0=D0=BD=D0=B5=D1=86=D1=8C?= Date: Wed, 25 Feb 2026 12:00:39 +0200 Subject: [PATCH 11/15] Refactor file rewinding logic to skip header row after seek(0) --- agent/src/file_datasource.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/agent/src/file_datasource.py b/agent/src/file_datasource.py index fe58e16..2ac0645 100644 --- a/agent/src/file_datasource.py +++ b/agent/src/file_datasource.py @@ -109,6 +109,7 @@ class FileDatasource: raise RuntimeError("Accelerometer file is not open.") self._acc_f.seek(0) self._acc_reader = csv.reader(self._acc_f, skipinitialspace=True) + next(self._acc_reader) # Skip header row _, self._acc_buf = self._detect_header_and_buffer( self._acc_reader, expected_cols=3, header_tokens=("x", "y", "z") ) @@ -118,6 +119,7 @@ class FileDatasource: raise RuntimeError("GPS file is not open.") self._gps_f.seek(0) self._gps_reader = csv.reader(self._gps_f, skipinitialspace=True) + next(self._gps_reader) # Skip header row _, self._gps_buf = self._detect_header_and_buffer( self._gps_reader, expected_cols=2, header_tokens=("longitude", "latitude") ) From d621390f5161e28c47e80ec974e7c862d7d33189 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9E=D0=BB=D0=B5=D0=BA=D1=81=D0=B0=D0=BD=D0=B4=D1=80=20?= =?UTF-8?q?=D0=93=D1=83=D1=80=D0=B0=D0=BD=D0=B5=D1=86=D1=8C?= Date: Wed, 25 Feb 2026 12:11:05 +0200 Subject: [PATCH 12/15] Refactor file rewind logic to skip header row and remove unnecessary buffers --- agent/src/file_datasource.py | 37 +++++++++++++----------------------- 1 file changed, 13 insertions(+), 24 deletions(-) diff --git a/agent/src/file_datasource.py b/agent/src/file_datasource.py index 2ac0645..90e5ba3 100644 --- a/agent/src/file_datasource.py +++ b/agent/src/file_datasource.py @@ -24,8 +24,8 @@ class FileDatasource: self._started = False # one-row buffers (supports CSVs with or without header) - self._acc_buf: Optional[List[str]] = None - self._gps_buf: Optional[List[str]] = None + self._acc_has_header: Optional[bool] = None + self._gps_has_header: Optional[bool] = None def startReading(self, *args, **kwargs): """Must be called before read()""" @@ -50,8 +50,8 @@ class FileDatasource: if not self._started: raise RuntimeError("Datasource is not started. Call startReading() before read().") - acc_row = self._get_next_row(self._acc_reader, self._acc_buf) - gps_row = self._get_next_row(self._gps_reader, self._gps_buf) + acc_row = self._get_next_row(self._acc_reader) + gps_row = self._get_next_row(self._gps_reader) acc = self._parse_acc(acc_row) gps = self._parse_gps(gps_row) @@ -78,16 +78,12 @@ class FileDatasource: self._acc_reader = csv.reader(self._acc_f, skipinitialspace=True) self._gps_reader = csv.reader(self._gps_f, skipinitialspace=True) - self._acc_buf = None - self._gps_buf = None + self._acc_has_header = None + self._gps_has_header = None # detect header / buffer first data row (we only need the buffered row) - _, self._acc_buf = self._detect_header_and_buffer( - self._acc_reader, expected_cols=3, header_tokens=("x", "y", "z") - ) - _, self._gps_buf = self._detect_header_and_buffer( - self._gps_reader, expected_cols=2, header_tokens=("longitude", "latitude") - ) + self._rewind_acc() + self._rewind_gps() def _close_files(self) -> None: for f in (self._acc_f, self._gps_f): @@ -101,8 +97,6 @@ class FileDatasource: self._gps_f = None self._acc_reader = None self._gps_reader = None - self._acc_buf = None - self._gps_buf = None def _rewind_acc(self) -> None: if self._acc_f is None: @@ -110,7 +104,7 @@ class FileDatasource: self._acc_f.seek(0) self._acc_reader = csv.reader(self._acc_f, skipinitialspace=True) next(self._acc_reader) # Skip header row - _, self._acc_buf = self._detect_header_and_buffer( + self._acc_has_header, _ = self._detect_header_and_buffer( self._acc_reader, expected_cols=3, header_tokens=("x", "y", "z") ) @@ -120,22 +114,17 @@ class FileDatasource: self._gps_f.seek(0) self._gps_reader = csv.reader(self._gps_f, skipinitialspace=True) next(self._gps_reader) # Skip header row - _, self._gps_buf = self._detect_header_and_buffer( + self._gps_has_header, _ = self._detect_header_and_buffer( self._gps_reader, expected_cols=2, header_tokens=("longitude", "latitude") ) - def _get_next_row(self, reader, buffer) -> List[str]: - """Get the next valid row from the reader or buffer.""" + def _get_next_row(self, reader) -> List[str]: + """Get the next valid row from the reader.""" if reader is None: raise RuntimeError("Reader is not initialized.") while True: - if buffer is not None: - row = buffer - buffer = None - else: - row = next(reader, None) - + row = next(reader, None) if row is None: # EOF -> rewind & continue self._rewind_acc() if reader == self._acc_reader else self._rewind_gps() From f58596ebf7aa4ae01867cfb82712788cfa57e04a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9E=D0=BB=D0=B5=D0=BA=D1=81=D0=B0=D0=BD=D0=B4=D1=80=20?= =?UTF-8?q?=D0=93=D1=83=D1=80=D0=B0=D0=BD=D0=B5=D1=86=D1=8C?= Date: Wed, 25 Feb 2026 12:35:58 +0200 Subject: [PATCH 13/15] Refactor FileDatasource: remove unused header detection variables --- agent/src/file_datasource.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/agent/src/file_datasource.py b/agent/src/file_datasource.py index 90e5ba3..03143d2 100644 --- a/agent/src/file_datasource.py +++ b/agent/src/file_datasource.py @@ -104,7 +104,7 @@ class FileDatasource: self._acc_f.seek(0) self._acc_reader = csv.reader(self._acc_f, skipinitialspace=True) next(self._acc_reader) # Skip header row - self._acc_has_header, _ = self._detect_header_and_buffer( + _ = self._detect_header_and_buffer( self._acc_reader, expected_cols=3, header_tokens=("x", "y", "z") ) @@ -114,7 +114,7 @@ class FileDatasource: self._gps_f.seek(0) self._gps_reader = csv.reader(self._gps_f, skipinitialspace=True) next(self._gps_reader) # Skip header row - self._gps_has_header, _ = self._detect_header_and_buffer( + _ = self._detect_header_and_buffer( self._gps_reader, expected_cols=2, header_tokens=("longitude", "latitude") ) From 953b0bdb9a35f4191bd5e3c56d01619f8a4ad2c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9E=D0=BB=D0=B5=D0=BA=D1=81=D0=B0=D0=BD=D0=B4=D1=80=20?= =?UTF-8?q?=D0=93=D1=83=D1=80=D0=B0=D0=BD=D0=B5=D1=86=D1=8C?= Date: Wed, 25 Feb 2026 20:56:43 +0200 Subject: [PATCH 14/15] Remove unused _detect_header_and_buffer method and related fields --- agent/src/file_datasource.py | 67 +++++++----------------------------- 1 file changed, 12 insertions(+), 55 deletions(-) diff --git a/agent/src/file_datasource.py b/agent/src/file_datasource.py index 03143d2..8aa8187 100644 --- a/agent/src/file_datasource.py +++ b/agent/src/file_datasource.py @@ -23,10 +23,6 @@ class FileDatasource: self._started = False - # one-row buffers (supports CSVs with or without header) - self._acc_has_header: Optional[bool] = None - self._gps_has_header: Optional[bool] = None - def startReading(self, *args, **kwargs): """Must be called before read()""" if self._started: @@ -50,8 +46,8 @@ class FileDatasource: if not self._started: raise RuntimeError("Datasource is not started. Call startReading() before read().") - acc_row = self._get_next_row(self._acc_reader) - gps_row = self._get_next_row(self._gps_reader) + acc_row = self._get_next_row(self._acc_reader, source="acc") + gps_row = self._get_next_row(self._gps_reader, source="gps") acc = self._parse_acc(acc_row) gps = self._parse_gps(gps_row) @@ -78,10 +74,6 @@ class FileDatasource: self._acc_reader = csv.reader(self._acc_f, skipinitialspace=True) self._gps_reader = csv.reader(self._gps_f, skipinitialspace=True) - self._acc_has_header = None - self._gps_has_header = None - - # detect header / buffer first data row (we only need the buffered row) self._rewind_acc() self._rewind_gps() @@ -103,22 +95,16 @@ class FileDatasource: raise RuntimeError("Accelerometer file is not open.") self._acc_f.seek(0) self._acc_reader = csv.reader(self._acc_f, skipinitialspace=True) - next(self._acc_reader) # Skip header row - _ = self._detect_header_and_buffer( - self._acc_reader, expected_cols=3, header_tokens=("x", "y", "z") - ) + next(self._acc_reader, None) # skip header row def _rewind_gps(self) -> None: if self._gps_f is None: raise RuntimeError("GPS file is not open.") self._gps_f.seek(0) self._gps_reader = csv.reader(self._gps_f, skipinitialspace=True) - next(self._gps_reader) # Skip header row - _ = self._detect_header_and_buffer( - self._gps_reader, expected_cols=2, header_tokens=("longitude", "latitude") - ) + next(self._gps_reader, None) # skip header row - def _get_next_row(self, reader) -> List[str]: + def _get_next_row(self, reader, source: str) -> List[str]: """Get the next valid row from the reader.""" if reader is None: raise RuntimeError("Reader is not initialized.") @@ -127,7 +113,12 @@ class FileDatasource: row = next(reader, None) if row is None: # EOF -> rewind & continue - self._rewind_acc() if reader == self._acc_reader else self._rewind_gps() + if source == "acc": + self._rewind_acc() + reader = self._acc_reader + else: + self._rewind_gps() + reader = self._gps_reader continue if not row or not any(cell.strip() for cell in row): @@ -135,32 +126,6 @@ class FileDatasource: return row - @staticmethod - def _detect_header_and_buffer( - rdr: csv.reader, expected_cols: int, header_tokens: tuple[str, ...] - ) -> tuple[bool, Optional[List[str]]]: - - first = None - while True: - first = next(rdr, None) - if first is None: - return False, None - if first and any(first): - break - - norm = [c.strip().lower() for c in first] - - # Header if it contains the expected tokens - if all(tok in norm for tok in header_tokens): - return True, None - - # If first row is numeric-like and has enough columns => it's data (buffer it back) - if len(norm) >= expected_cols and all(FileDatasource._is_number(x) for x in norm[:expected_cols]): - return False, first - - # Otherwise treat it as header-ish (skip it) - return True, None - @staticmethod def _parse_acc(row: List[str]) -> Accelerometer: if len(row) < 3: @@ -181,12 +146,4 @@ class FileDatasource: raise ValueError(f"GPS row must have 2 values (longitude,latitude). Got: {row}") lon = float(row[0]) lat = float(row[1]) - return Gps(longitude=lon, latitude=lat) - - @staticmethod - def _is_number(s: str) -> bool: - try: - float(s) - return True - except Exception: - return False \ No newline at end of file + return Gps(longitude=lon, latitude=lat) \ No newline at end of file From 9473c5a6216aa8bafb0a467e223263c14901baa2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9E=D0=BB=D0=B5=D0=BA=D1=81=D0=B0=D0=BD=D0=B4=D1=80=20?= =?UTF-8?q?=D0=93=D1=83=D1=80=D0=B0=D0=BD=D0=B5=D1=86=D1=8C?= Date: Wed, 25 Feb 2026 21:08:54 +0200 Subject: [PATCH 15/15] Remove unnecessary rewind after file open --- agent/src/file_datasource.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/agent/src/file_datasource.py b/agent/src/file_datasource.py index 8aa8187..363f1df 100644 --- a/agent/src/file_datasource.py +++ b/agent/src/file_datasource.py @@ -74,8 +74,10 @@ class FileDatasource: self._acc_reader = csv.reader(self._acc_f, skipinitialspace=True) self._gps_reader = csv.reader(self._gps_f, skipinitialspace=True) - self._rewind_acc() - self._rewind_gps() + # File pointer is already at 0 right after open(), so no need to rewind here. + # Skip header row once. + next(self._acc_reader, None) + next(self._gps_reader, None) def _close_files(self) -> None: for f in (self._acc_f, self._gps_f):