FileReader #5

Merged
bacant150 merged 15 commits from lab1_huranets into lab1 2026-02-26 10:04:10 +02:00
3 changed files with 145 additions and 29 deletions

2
.gitignore vendored Normal file
View File

@@ -0,0 +1,2 @@
agent/docker/mosquitto/data/
agent/docker/mosquitto/log/

View File

@@ -1,5 +1,9 @@
from csv import reader
import csv
hasslesstech commented 2026-02-24 19:37:02 +02:00 (Migrated from github.com)
Review

Naming convention is misleading, suggests parser specifically takes 16-bit integers. This parser should read any integer value and should just be int(row[0]), not a separate function call

Several identical issues occur in the next few lines

Naming convention is misleading, suggests parser specifically takes 16-bit integers. This parser should read any integer value and should just be `int(row[0])`, not a separate function call Several identical issues occur in the next few lines
hasslesstech commented 2026-02-24 19:37:45 +02:00 (Migrated from github.com)
Review

Unused variables, seem to be leftovers from previously removed INT16 boundary checking

Unused variables, seem to be leftovers from previously removed INT16 boundary checking
bacant150 commented 2026-02-24 20:01:55 +02:00 (Migrated from github.com)
Review

removed unnecessary variables

removed unnecessary variables
hasslesstech commented 2026-02-24 20:04:13 +02:00 (Migrated from github.com)
Review

Method is not used and does not provide additional functionality compared to default int() function, so should be removed

Method is not used and does not provide additional functionality compared to default `int()` function, so should be removed
bacant150 commented 2026-02-24 20:23:35 +02:00 (Migrated from github.com)
Review

Fixed: removed int16-specific naming/helpers and now parse accelerometer values directly using int(row[i]) without any bit-width assumptions. Pushed the update to the PR.

Fixed: removed int16-specific naming/helpers and now parse accelerometer values directly using int(row[i]) without any bit-width assumptions. Pushed the update to the PR.
bacant150 commented 2026-02-24 20:24:06 +02:00 (Migrated from github.com)
Review

Removed the unused _parse_int helper (it didn’t add any functionality over built-in int()). Pushed the update to the PR.

Removed the unused _parse_int helper (it didn’t add any functionality over built-in int()). Pushed the update to the PR.
VladiusVostokus commented 2026-02-25 11:17:41 +02:00 (Migrated from github.com)
Review

And after chage of rewinders, you can remove _acc_buf to hold first row

And after chage of rewinders, you can remove _acc_buf to hold first row
VladiusVostokus commented 2026-02-25 11:24:43 +02:00 (Migrated from github.com)
Review

What does this means?
if assuming this is a header with indexies of domain objects maybe make sense.

What does this means? if assuming this is a header with indexies of domain objects maybe make sense.
bacant150 commented 2026-02-25 12:14:42 +02:00 (Migrated from github.com)
Review

Removed unnecessary buffers like _acc_buf and _gps_buf.

Removed unnecessary buffers like _acc_buf and _gps_buf.
bacant150 commented 2026-02-25 12:53:57 +02:00 (Migrated from github.com)
Review

I initially tried removing this part, but the testing failed without it

I initially tried removing this part, but the testing failed without it
import time
from datetime import datetime
from pathlib import Path
from typing import Optional, List
from domain.accelerometer import Accelerometer
from domain.gps import Gps
from domain.aggregated_data import AggregatedData
@@ -7,24 +11,141 @@ import config
class FileDatasource:
def __init__(
self,
accelerometer_filename: str,
gps_filename: str,
) -> None:
pass
def read(self) -> AggregatedData:
"""Метод повертає дані отримані з датчиків"""
return AggregatedData(
Accelerometer(1, 2, 3),
Gps(4, 5),
datetime.now(),
config.USER_ID,
)
def __init__(self, accelerometer_filename: str, gps_filename: str) -> None:
self.accelerometer_filename = accelerometer_filename
self.gps_filename = gps_filename
self._acc_f = None
self._gps_f = None
self._acc_reader: Optional[csv.reader] = None
self._gps_reader: Optional[csv.reader] = None
self._started = False
def startReading(self, *args, **kwargs):
"""Метод повинен викликатись перед початком читання даних"""
"""Must be called before read()"""
if self._started:
return
if not Path(self.accelerometer_filename).exists():
raise FileNotFoundError(f"Accelerometer file not found: {self.accelerometer_filename}")
if not Path(self.gps_filename).exists():
raise FileNotFoundError(f"GPS file not found: {self.gps_filename}")
self._open_files()
self._started = True
def stopReading(self, *args, **kwargs):
"""Метод повинен викликатись для закінчення читання даних"""
"""Must be called when finishing reading"""
self._close_files()
self._started = False
def read(self) -> AggregatedData:
"""Return one combined sample (acc + gps)."""
if not self._started:
raise RuntimeError("Datasource is not started. Call startReading() before read().")
acc_row = self._get_next_row(self._acc_reader, source="acc")
gps_row = self._get_next_row(self._gps_reader, source="gps")
acc = self._parse_acc(acc_row)
gps = self._parse_gps(gps_row)
# IMPORTANT: timing belongs to datasource (not MQTT / main.py)
if config.DELAY and config.DELAY > 0:
time.sleep(float(config.DELAY))
return AggregatedData(
accelerometer=acc,
gps=gps,
timestamp=datetime.utcnow(),
user_id=config.USER_ID,
)
# ---------------- internal ----------------
def _open_files(self) -> None:
self._close_files()
self._acc_f = open(self.accelerometer_filename, "r", newline="", encoding="utf-8")
self._gps_f = open(self.gps_filename, "r", newline="", encoding="utf-8")
self._acc_reader = csv.reader(self._acc_f, skipinitialspace=True)
self._gps_reader = csv.reader(self._gps_f, skipinitialspace=True)
VladiusVostokus commented 2026-02-25 11:20:05 +02:00 (Migrated from github.com)
Review

You just can not return this variable from _detect_header_and_buffer() if it is not used

You just can not return this variable from _detect_header_and_buffer() if it is not used
bacant150 commented 2026-02-25 12:36:28 +02:00 (Migrated from github.com)
Review

removed unnecessary variables from _detect_header_and_buffer() and fixed the _rewind_acc() and _rewind_gps() methods

removed unnecessary variables from _detect_header_and_buffer() and fixed the _rewind_acc() and _rewind_gps() methods
# File pointer is already at 0 right after open(), so no need to rewind here.
# Skip header row once.
VladiusVostokus commented 2026-02-25 16:05:28 +02:00 (Migrated from github.com)
Review

if you open file, your read pointer is already 0, why to set it to 0 again?

if you open file, your read pointer is already 0, why to set it to 0 again?
bacant150 commented 2026-02-25 21:09:54 +02:00 (Migrated from github.com)
Review

I removed the unnecessary rewind from _open_files().

I removed the unnecessary rewind from _open_files().
next(self._acc_reader, None)
next(self._gps_reader, None)
def _close_files(self) -> None:
for f in (self._acc_f, self._gps_f):
try:
if f is not None:
f.close()
except Exception:
pass
self._acc_f = None
self._gps_f = None
self._acc_reader = None
self._gps_reader = None
def _rewind_acc(self) -> None:
if self._acc_f is None:
raise RuntimeError("Accelerometer file is not open.")
self._acc_f.seek(0)
self._acc_reader = csv.reader(self._acc_f, skipinitialspace=True)
next(self._acc_reader, None) # skip header row
def _rewind_gps(self) -> None:
if self._gps_f is None:
raise RuntimeError("GPS file is not open.")
VladiusVostokus commented 2026-02-25 16:06:38 +02:00 (Migrated from github.com)
Review

Why leave _ here if it's not used?

Why leave _ here if it's not used?
bacant150 commented 2026-02-25 21:13:00 +02:00 (Migrated from github.com)
Review

Good catch. The _detect_header_and_buffer helper wasn’t used and didn’t modify class state, so I removed it.

Good catch. The _detect_header_and_buffer helper wasn’t used and didn’t modify class state, so I removed it.
self._gps_f.seek(0)
self._gps_reader = csv.reader(self._gps_f, skipinitialspace=True)
next(self._gps_reader, None) # skip header row
def _get_next_row(self, reader, source: str) -> List[str]:
"""Get the next valid row from the reader."""
if reader is None:
VladiusVostokus commented 2026-02-25 11:14:51 +02:00 (Migrated from github.com)
Review

you can just skip first row after seek(0) knowing that this is a header, not actual value,
so you can write like
self._acc_f.seek(0)
self._acc_reader = csv.reader(self._acc_f)
next(self._acc_reader)
row = next(self.acc_reader)

you can just skip first row after seek(0) knowing that this is a header, not actual value, so you can write like self._acc_f.seek(0) self._acc_reader = csv.reader(self._acc_f) next(self._acc_reader) row = next(self.acc_reader)
VladiusVostokus commented 2026-02-25 11:15:57 +02:00 (Migrated from github.com)
Review

And do the same for other rewinders

And do the same for other rewinders
bacant150 commented 2026-02-25 12:02:34 +02:00 (Migrated from github.com)
Review

Refactoring file rewind logic to skip header line after seek(0) and for other rewinders

Refactoring file rewind logic to skip header line after seek(0) and for other rewinders
raise RuntimeError("Reader is not initialized.")
while True:
VladiusVostokus commented 2026-02-25 15:52:46 +02:00 (Migrated from github.com)
Review

Why leave _ here if it's not used?
Maybe dont return anything from this method?

Why leave _ here if it's not used? Maybe dont return anything from this method?
VladiusVostokus commented 2026-02-25 16:00:51 +02:00 (Migrated from github.com)
Review

As I see this method doesn't modify any class fiels, so maybe just remove this method?

As I see this method doesn't modify any class fiels, so maybe just remove this method?
bacant150 commented 2026-02-25 20:57:57 +02:00 (Migrated from github.com)
Review

The _detect_header_and_buffer method was not modifying any class state and its return value was unused.
I removed the method and simplified the logic.

The _detect_header_and_buffer method was not modifying any class state and its return value was unused. I removed the method and simplified the logic.
row = next(reader, None)
if row is None:
# EOF -> rewind & continue
if source == "acc":
self._rewind_acc()
reader = self._acc_reader
else:
self._rewind_gps()
reader = self._gps_reader
continue
if not row or not any(cell.strip() for cell in row):
continue
return row
VladiusVostokus commented 2026-02-25 11:10:29 +02:00 (Migrated from github.com)
Review

This looks a bit comlex
Why itarate though loop to get 1 row?

This looks a bit comlex Why itarate though loop to get 1 row?
bacant150 commented 2026-02-25 11:53:08 +02:00 (Migrated from github.com)
Review

"Simplified the line reading logic by replacing the while True loop with a clearer check

"Simplified the line reading logic by replacing the while True loop with a clearer check
@staticmethod
def _parse_acc(row: List[str]) -> Accelerometer:
if len(row) < 3:
raise ValueError(f"Accelerometer row must have 3 values (x,y,z). Got: {row}")
try:
x = int(row[0])
y = int(row[1])
z = int(row[2])
except ValueError as e:
raise ValueError(f"Invalid accelerometer values (expected integers): {row}") from e
return Accelerometer(x=x, y=y, z=z)
@staticmethod
def _parse_gps(row: List[str]) -> Gps:
if len(row) < 2:
raise ValueError(f"GPS row must have 2 values (longitude,latitude). Got: {row}")
lon = float(row[0])
lat = float(row[1])
return Gps(longitude=lon, latitude=lat)

View File

@@ -1,6 +1,4 @@
from paho.mqtt import client as mqtt_client
import json
import time
from schema.aggregated_data_schema import AggregatedDataSchema
from file_datasource import FileDatasource
import config
@@ -24,19 +22,14 @@ def connect_mqtt(broker, port):
return client
def publish(client, topic, datasource, delay):
def publish(client, topic, datasource):
datasource.startReading()
while True:
time.sleep(delay)
data = datasource.read()
msg = AggregatedDataSchema().dumps(data)
result = client.publish(topic, msg)
# result: [0, 1]
status = result[0]
if status == 0:
pass
# print(f"Send `{msg}` to topic `{topic}`")
else:
if status != 0:
print(f"Failed to send message to topic {topic}")
@@ -44,9 +37,9 @@ def run():
# Prepare mqtt client
client = connect_mqtt(config.MQTT_BROKER_HOST, config.MQTT_BROKER_PORT)
# Prepare datasource
datasource = FileDatasource("data/data.csv", "data/gps_data.csv")
datasource = FileDatasource("data/accelerometer.csv", "data/gps.csv")
# Infinity publish data
publish(client, config.MQTT_TOPIC, datasource, config.DELAY)
publish(client, config.MQTT_TOPIC, datasource)
if __name__ == "__main__":