FileReader #5
2
.gitignore
vendored
Normal file
@@ -0,0 +1,2 @@
|
|||||||
|
agent/docker/mosquitto/data/
|
||||||
|
agent/docker/mosquitto/log/
|
||||||
@@ -1,5 +1,9 @@
|
|||||||
from csv import reader
|
import csv
|
||||||
|
|
|||||||
|
import time
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Optional, List
|
||||||
|
|
||||||
from domain.accelerometer import Accelerometer
|
from domain.accelerometer import Accelerometer
|
||||||
from domain.gps import Gps
|
from domain.gps import Gps
|
||||||
from domain.aggregated_data import AggregatedData
|
from domain.aggregated_data import AggregatedData
|
||||||
@@ -7,24 +11,141 @@ import config
|
|||||||
|
|
||||||
|
|
||||||
class FileDatasource:
|
class FileDatasource:
|
||||||
def __init__(
|
|
||||||
self,
|
|
||||||
accelerometer_filename: str,
|
|
||||||
gps_filename: str,
|
|
||||||
) -> None:
|
|
||||||
pass
|
|
||||||
|
|
||||||
def read(self) -> AggregatedData:
|
def __init__(self, accelerometer_filename: str, gps_filename: str) -> None:
|
||||||
"""Метод повертає дані отримані з датчиків"""
|
self.accelerometer_filename = accelerometer_filename
|
||||||
return AggregatedData(
|
self.gps_filename = gps_filename
|
||||||
Accelerometer(1, 2, 3),
|
|
||||||
Gps(4, 5),
|
self._acc_f = None
|
||||||
datetime.now(),
|
self._gps_f = None
|
||||||
config.USER_ID,
|
self._acc_reader: Optional[csv.reader] = None
|
||||||
)
|
self._gps_reader: Optional[csv.reader] = None
|
||||||
|
|
||||||
|
self._started = False
|
||||||
|
|
||||||
def startReading(self, *args, **kwargs):
|
def startReading(self, *args, **kwargs):
|
||||||
"""Метод повинен викликатись перед початком читання даних"""
|
"""Must be called before read()"""
|
||||||
|
if self._started:
|
||||||
|
return
|
||||||
|
|
||||||
|
if not Path(self.accelerometer_filename).exists():
|
||||||
|
raise FileNotFoundError(f"Accelerometer file not found: {self.accelerometer_filename}")
|
||||||
|
if not Path(self.gps_filename).exists():
|
||||||
|
raise FileNotFoundError(f"GPS file not found: {self.gps_filename}")
|
||||||
|
|
||||||
|
self._open_files()
|
||||||
|
self._started = True
|
||||||
|
|
||||||
def stopReading(self, *args, **kwargs):
|
def stopReading(self, *args, **kwargs):
|
||||||
"""Метод повинен викликатись для закінчення читання даних"""
|
"""Must be called when finishing reading"""
|
||||||
|
self._close_files()
|
||||||
|
self._started = False
|
||||||
|
|
||||||
|
def read(self) -> AggregatedData:
|
||||||
|
"""Return one combined sample (acc + gps)."""
|
||||||
|
if not self._started:
|
||||||
|
raise RuntimeError("Datasource is not started. Call startReading() before read().")
|
||||||
|
|
||||||
|
acc_row = self._get_next_row(self._acc_reader, source="acc")
|
||||||
|
gps_row = self._get_next_row(self._gps_reader, source="gps")
|
||||||
|
|
||||||
|
acc = self._parse_acc(acc_row)
|
||||||
|
gps = self._parse_gps(gps_row)
|
||||||
|
|
||||||
|
# IMPORTANT: timing belongs to datasource (not MQTT / main.py)
|
||||||
|
if config.DELAY and config.DELAY > 0:
|
||||||
|
time.sleep(float(config.DELAY))
|
||||||
|
|
||||||
|
return AggregatedData(
|
||||||
|
accelerometer=acc,
|
||||||
|
gps=gps,
|
||||||
|
timestamp=datetime.utcnow(),
|
||||||
|
user_id=config.USER_ID,
|
||||||
|
)
|
||||||
|
|
||||||
|
# ---------------- internal ----------------
|
||||||
|
|
||||||
|
def _open_files(self) -> None:
|
||||||
|
self._close_files()
|
||||||
|
|
||||||
|
self._acc_f = open(self.accelerometer_filename, "r", newline="", encoding="utf-8")
|
||||||
|
self._gps_f = open(self.gps_filename, "r", newline="", encoding="utf-8")
|
||||||
|
|
||||||
|
self._acc_reader = csv.reader(self._acc_f, skipinitialspace=True)
|
||||||
|
self._gps_reader = csv.reader(self._gps_f, skipinitialspace=True)
|
||||||
|
You just can not return this variable from _detect_header_and_buffer() if it is not used You just can not return this variable from _detect_header_and_buffer() if it is not used
removed unnecessary variables from _detect_header_and_buffer() and fixed the _rewind_acc() and _rewind_gps() methods removed unnecessary variables from _detect_header_and_buffer() and fixed the _rewind_acc() and _rewind_gps() methods
|
|||||||
|
|
||||||
|
# File pointer is already at 0 right after open(), so no need to rewind here.
|
||||||
|
# Skip header row once.
|
||||||
|
if you open file, your read pointer is already 0, why to set it to 0 again? if you open file, your read pointer is already 0, why to set it to 0 again?
I removed the unnecessary rewind from _open_files(). I removed the unnecessary rewind from _open_files().
|
|||||||
|
next(self._acc_reader, None)
|
||||||
|
next(self._gps_reader, None)
|
||||||
|
|
||||||
|
def _close_files(self) -> None:
|
||||||
|
for f in (self._acc_f, self._gps_f):
|
||||||
|
try:
|
||||||
|
if f is not None:
|
||||||
|
f.close()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
self._acc_f = None
|
||||||
|
self._gps_f = None
|
||||||
|
self._acc_reader = None
|
||||||
|
self._gps_reader = None
|
||||||
|
|
||||||
|
def _rewind_acc(self) -> None:
|
||||||
|
if self._acc_f is None:
|
||||||
|
raise RuntimeError("Accelerometer file is not open.")
|
||||||
|
self._acc_f.seek(0)
|
||||||
|
self._acc_reader = csv.reader(self._acc_f, skipinitialspace=True)
|
||||||
|
next(self._acc_reader, None) # skip header row
|
||||||
|
|
||||||
|
def _rewind_gps(self) -> None:
|
||||||
|
if self._gps_f is None:
|
||||||
|
raise RuntimeError("GPS file is not open.")
|
||||||
|
Why leave _ here if it's not used? Why leave _ here if it's not used?
Good catch. The _detect_header_and_buffer helper wasn’t used and didn’t modify class state, so I removed it. Good catch. The _detect_header_and_buffer helper wasn’t used and didn’t modify class state, so I removed it.
|
|||||||
|
self._gps_f.seek(0)
|
||||||
|
self._gps_reader = csv.reader(self._gps_f, skipinitialspace=True)
|
||||||
|
next(self._gps_reader, None) # skip header row
|
||||||
|
|
||||||
|
def _get_next_row(self, reader, source: str) -> List[str]:
|
||||||
|
"""Get the next valid row from the reader."""
|
||||||
|
if reader is None:
|
||||||
|
you can just skip first row after seek(0) knowing that this is a header, not actual value, you can just skip first row after seek(0) knowing that this is a header, not actual value,
so you can write like
self._acc_f.seek(0)
self._acc_reader = csv.reader(self._acc_f)
next(self._acc_reader)
row = next(self.acc_reader)
And do the same for other rewinders And do the same for other rewinders
Refactoring file rewind logic to skip header line after seek(0) and for other rewinders Refactoring file rewind logic to skip header line after seek(0) and for other rewinders
|
|||||||
|
raise RuntimeError("Reader is not initialized.")
|
||||||
|
|
||||||
|
while True:
|
||||||
|
Why leave _ here if it's not used? Why leave _ here if it's not used?
Maybe dont return anything from this method?
As I see this method doesn't modify any class fiels, so maybe just remove this method? As I see this method doesn't modify any class fiels, so maybe just remove this method?
The _detect_header_and_buffer method was not modifying any class state and its return value was unused. The _detect_header_and_buffer method was not modifying any class state and its return value was unused.
I removed the method and simplified the logic.
|
|||||||
|
row = next(reader, None)
|
||||||
|
if row is None:
|
||||||
|
# EOF -> rewind & continue
|
||||||
|
if source == "acc":
|
||||||
|
self._rewind_acc()
|
||||||
|
reader = self._acc_reader
|
||||||
|
else:
|
||||||
|
self._rewind_gps()
|
||||||
|
reader = self._gps_reader
|
||||||
|
continue
|
||||||
|
|
||||||
|
if not row or not any(cell.strip() for cell in row):
|
||||||
|
continue
|
||||||
|
|
||||||
|
return row
|
||||||
|
|
||||||
|
This looks a bit comlex This looks a bit comlex
Why itarate though loop to get 1 row?
"Simplified the line reading logic by replacing the while True loop with a clearer check "Simplified the line reading logic by replacing the while True loop with a clearer check
|
|||||||
|
@staticmethod
|
||||||
|
def _parse_acc(row: List[str]) -> Accelerometer:
|
||||||
|
if len(row) < 3:
|
||||||
|
raise ValueError(f"Accelerometer row must have 3 values (x,y,z). Got: {row}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
x = int(row[0])
|
||||||
|
y = int(row[1])
|
||||||
|
z = int(row[2])
|
||||||
|
except ValueError as e:
|
||||||
|
raise ValueError(f"Invalid accelerometer values (expected integers): {row}") from e
|
||||||
|
|
||||||
|
return Accelerometer(x=x, y=y, z=z)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _parse_gps(row: List[str]) -> Gps:
|
||||||
|
if len(row) < 2:
|
||||||
|
raise ValueError(f"GPS row must have 2 values (longitude,latitude). Got: {row}")
|
||||||
|
lon = float(row[0])
|
||||||
|
lat = float(row[1])
|
||||||
|
return Gps(longitude=lon, latitude=lat)
|
||||||
@@ -1,6 +1,4 @@
|
|||||||
from paho.mqtt import client as mqtt_client
|
from paho.mqtt import client as mqtt_client
|
||||||
import json
|
|
||||||
import time
|
|
||||||
from schema.aggregated_data_schema import AggregatedDataSchema
|
from schema.aggregated_data_schema import AggregatedDataSchema
|
||||||
from file_datasource import FileDatasource
|
from file_datasource import FileDatasource
|
||||||
import config
|
import config
|
||||||
@@ -24,19 +22,14 @@ def connect_mqtt(broker, port):
|
|||||||
return client
|
return client
|
||||||
|
|
||||||
|
|
||||||
def publish(client, topic, datasource, delay):
|
def publish(client, topic, datasource):
|
||||||
datasource.startReading()
|
datasource.startReading()
|
||||||
while True:
|
while True:
|
||||||
time.sleep(delay)
|
|
||||||
data = datasource.read()
|
data = datasource.read()
|
||||||
msg = AggregatedDataSchema().dumps(data)
|
msg = AggregatedDataSchema().dumps(data)
|
||||||
result = client.publish(topic, msg)
|
result = client.publish(topic, msg)
|
||||||
# result: [0, 1]
|
|
||||||
status = result[0]
|
status = result[0]
|
||||||
if status == 0:
|
if status != 0:
|
||||||
pass
|
|
||||||
# print(f"Send `{msg}` to topic `{topic}`")
|
|
||||||
else:
|
|
||||||
print(f"Failed to send message to topic {topic}")
|
print(f"Failed to send message to topic {topic}")
|
||||||
|
|
||||||
|
|
||||||
@@ -44,9 +37,9 @@ def run():
|
|||||||
# Prepare mqtt client
|
# Prepare mqtt client
|
||||||
client = connect_mqtt(config.MQTT_BROKER_HOST, config.MQTT_BROKER_PORT)
|
client = connect_mqtt(config.MQTT_BROKER_HOST, config.MQTT_BROKER_PORT)
|
||||||
# Prepare datasource
|
# Prepare datasource
|
||||||
datasource = FileDatasource("data/data.csv", "data/gps_data.csv")
|
datasource = FileDatasource("data/accelerometer.csv", "data/gps.csv")
|
||||||
# Infinity publish data
|
# Infinity publish data
|
||||||
publish(client, config.MQTT_TOPIC, datasource, config.DELAY)
|
publish(client, config.MQTT_TOPIC, datasource)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|||||||
Naming convention is misleading, suggests parser specifically takes 16-bit integers. This parser should read any integer value and should just be
int(row[0]), not a separate function callSeveral identical issues occur in the next few lines
Unused variables, seem to be leftovers from previously removed INT16 boundary checking
removed unnecessary variables
Method is not used and does not provide additional functionality compared to default
int()function, so should be removedFixed: removed int16-specific naming/helpers and now parse accelerometer values directly using int(row[i]) without any bit-width assumptions. Pushed the update to the PR.
Removed the unused _parse_int helper (it didn’t add any functionality over built-in int()). Pushed the update to the PR.
And after chage of rewinders, you can remove _acc_buf to hold first row
What does this means?
if assuming this is a header with indexies of domain objects maybe make sense.
Removed unnecessary buffers like _acc_buf and _gps_buf.
I initially tried removing this part, but the testing failed without it