FileReader #5
@@ -1,5 +1,11 @@
|
|||||||
from csv import reader
|
from __future__ import annotations
|
||||||
|
|
|||||||
|
|
||||||
|
import csv
|
||||||
|
import time
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Optional, List
|
||||||
|
|
||||||
from domain.accelerometer import Accelerometer
|
from domain.accelerometer import Accelerometer
|
||||||
from domain.gps import Gps
|
from domain.gps import Gps
|
||||||
from domain.aggregated_data import AggregatedData
|
from domain.aggregated_data import AggregatedData
|
||||||
@@ -7,24 +13,213 @@ import config
|
|||||||
|
|
||||||
|
|
||||||
class FileDatasource:
|
class FileDatasource:
|
||||||
def __init__(
|
|
||||||
self,
|
|
||||||
accelerometer_filename: str,
|
|
||||||
gps_filename: str,
|
|
||||||
) -> None:
|
|
||||||
pass
|
|
||||||
|
|
||||||
def read(self) -> AggregatedData:
|
def __init__(self, accelerometer_filename: str, gps_filename: str) -> None:
|
||||||
"""Метод повертає дані отримані з датчиків"""
|
self.accelerometer_filename = accelerometer_filename
|
||||||
return AggregatedData(
|
self.gps_filename = gps_filename
|
||||||
Accelerometer(1, 2, 3),
|
|
||||||
Gps(4, 5),
|
self._acc_f = None
|
||||||
datetime.now(),
|
self._gps_f = None
|
||||||
config.USER_ID,
|
self._acc_reader: Optional[csv.reader] = None
|
||||||
)
|
self._gps_reader: Optional[csv.reader] = None
|
||||||
|
|
||||||
|
self._started = False
|
||||||
|
|
||||||
|
# one-row buffers (supports CSVs with or without header)
|
||||||
|
self._acc_buf: Optional[List[str]] = None
|
||||||
|
self._gps_buf: Optional[List[str]] = None
|
||||||
|
|
||||||
|
self._acc_has_header: Optional[bool] = None
|
||||||
|
self._gps_has_header: Optional[bool] = None
|
||||||
|
|
||||||
def startReading(self, *args, **kwargs):
|
def startReading(self, *args, **kwargs):
|
||||||
"""Метод повинен викликатись перед початком читання даних"""
|
"""Must be called before read()"""
|
||||||
|
if self._started:
|
||||||
|
return
|
||||||
|
|
||||||
|
if not Path(self.accelerometer_filename).exists():
|
||||||
|
raise FileNotFoundError(f"Accelerometer file not found: {self.accelerometer_filename}")
|
||||||
|
if not Path(self.gps_filename).exists():
|
||||||
|
raise FileNotFoundError(f"GPS file not found: {self.gps_filename}")
|
||||||
|
|
||||||
|
self._open_files()
|
||||||
|
self._started = True
|
||||||
|
|
||||||
def stopReading(self, *args, **kwargs):
|
def stopReading(self, *args, **kwargs):
|
||||||
"""Метод повинен викликатись для закінчення читання даних"""
|
"""Must be called when finishing reading"""
|
||||||
|
self._close_files()
|
||||||
|
self._started = False
|
||||||
|
|
||||||
|
def read(self) -> AggregatedData:
|
||||||
|
"""Return one combined sample (acc + gps)."""
|
||||||
|
if not self._started:
|
||||||
|
raise RuntimeError("Datasource is not started. Call startReading() before read().")
|
||||||
|
|
||||||
|
acc_row = self._next_acc_row()
|
||||||
|
gps_row = self._next_gps_row()
|
||||||
|
|
||||||
|
acc = self._parse_acc(acc_row)
|
||||||
|
gps = self._parse_gps(gps_row)
|
||||||
|
|
||||||
|
# IMPORTANT: timing belongs to datasource (not MQTT / main.py)
|
||||||
|
if config.DELAY and config.DELAY > 0:
|
||||||
|
time.sleep(float(config.DELAY))
|
||||||
|
|
||||||
|
return AggregatedData(
|
||||||
|
accelerometer=acc,
|
||||||
|
gps=gps,
|
||||||
|
time=datetime.utcnow(),
|
||||||
|
user_id=config.USER_ID,
|
||||||
|
)
|
||||||
|
|
||||||
|
# ---------------- internal ----------------
|
||||||
|
You just can not return this variable from _detect_header_and_buffer() if it is not used You just can not return this variable from _detect_header_and_buffer() if it is not used
removed unnecessary variables from _detect_header_and_buffer() and fixed the _rewind_acc() and _rewind_gps() methods removed unnecessary variables from _detect_header_and_buffer() and fixed the _rewind_acc() and _rewind_gps() methods
|
|||||||
|
|
||||||
|
def _open_files(self) -> None:
|
||||||
|
self._close_files()
|
||||||
|
if you open file, your read pointer is already 0, why to set it to 0 again? if you open file, your read pointer is already 0, why to set it to 0 again?
I removed the unnecessary rewind from _open_files(). I removed the unnecessary rewind from _open_files().
|
|||||||
|
|
||||||
|
self._acc_f = open(self.accelerometer_filename, "r", newline="", encoding="utf-8")
|
||||||
|
self._gps_f = open(self.gps_filename, "r", newline="", encoding="utf-8")
|
||||||
|
|
||||||
|
self._acc_reader = csv.reader(self._acc_f)
|
||||||
|
self._gps_reader = csv.reader(self._gps_f)
|
||||||
|
|
||||||
|
self._acc_buf = None
|
||||||
|
self._gps_buf = None
|
||||||
|
|
||||||
|
self._acc_has_header, self._acc_buf = self._detect_header_and_buffer(
|
||||||
|
self._acc_reader, expected_cols=3, header_tokens=("x", "y", "z")
|
||||||
|
)
|
||||||
|
self._gps_has_header, self._gps_buf = self._detect_header_and_buffer(
|
||||||
|
self._gps_reader, expected_cols=2, header_tokens=("longitude", "latitude")
|
||||||
|
)
|
||||||
|
|
||||||
|
def _close_files(self) -> None:
|
||||||
|
for f in (self._acc_f, self._gps_f):
|
||||||
|
try:
|
||||||
|
if f is not None:
|
||||||
|
f.close()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
self._acc_f = None
|
||||||
|
Why leave _ here if it's not used? Why leave _ here if it's not used?
Good catch. The _detect_header_and_buffer helper wasn’t used and didn’t modify class state, so I removed it. Good catch. The _detect_header_and_buffer helper wasn’t used and didn’t modify class state, so I removed it.
|
|||||||
|
self._gps_f = None
|
||||||
|
self._acc_reader = None
|
||||||
|
self._gps_reader = None
|
||||||
|
self._acc_buf = None
|
||||||
|
self._gps_buf = None
|
||||||
|
self._acc_has_header = None
|
||||||
|
self._gps_has_header = None
|
||||||
|
you can just skip first row after seek(0) knowing that this is a header, not actual value, you can just skip first row after seek(0) knowing that this is a header, not actual value,
so you can write like
self._acc_f.seek(0)
self._acc_reader = csv.reader(self._acc_f)
next(self._acc_reader)
row = next(self.acc_reader)
And do the same for other rewinders And do the same for other rewinders
Refactoring file rewind logic to skip header line after seek(0) and for other rewinders Refactoring file rewind logic to skip header line after seek(0) and for other rewinders
|
|||||||
|
|
||||||
|
def _rewind_acc(self) -> None:
|
||||||
|
if self._acc_f is None:
|
||||||
|
Why leave _ here if it's not used? Why leave _ here if it's not used?
Maybe dont return anything from this method?
As I see this method doesn't modify any class fiels, so maybe just remove this method? As I see this method doesn't modify any class fiels, so maybe just remove this method?
The _detect_header_and_buffer method was not modifying any class state and its return value was unused. The _detect_header_and_buffer method was not modifying any class state and its return value was unused.
I removed the method and simplified the logic.
|
|||||||
|
raise RuntimeError("Accelerometer file is not open.")
|
||||||
|
self._acc_f.seek(0)
|
||||||
|
self._acc_reader = csv.reader(self._acc_f)
|
||||||
|
self._acc_has_header, self._acc_buf = self._detect_header_and_buffer(
|
||||||
|
self._acc_reader, expected_cols=3, header_tokens=("x", "y", "z")
|
||||||
|
)
|
||||||
|
|
||||||
|
def _rewind_gps(self) -> None:
|
||||||
|
if self._gps_f is None:
|
||||||
|
raise RuntimeError("GPS file is not open.")
|
||||||
|
self._gps_f.seek(0)
|
||||||
|
self._gps_reader = csv.reader(self._gps_f)
|
||||||
|
self._gps_has_header, self._gps_buf = self._detect_header_and_buffer(
|
||||||
|
self._gps_reader, expected_cols=2, header_tokens=("longitude", "latitude")
|
||||||
|
)
|
||||||
|
|
||||||
|
This looks a bit comlex This looks a bit comlex
Why itarate though loop to get 1 row?
"Simplified the line reading logic by replacing the while True loop with a clearer check "Simplified the line reading logic by replacing the while True loop with a clearer check
|
|||||||
|
def _next_acc_row(self) -> List[str]:
|
||||||
|
if self._acc_reader is None:
|
||||||
|
raise RuntimeError("Accelerometer reader is not initialized.")
|
||||||
|
|
||||||
|
while True:
|
||||||
|
if self._acc_buf is not None:
|
||||||
|
row = self._acc_buf
|
||||||
|
self._acc_buf = None
|
||||||
|
else:
|
||||||
|
row = next(self._acc_reader, None)
|
||||||
|
|
||||||
|
if row is None:
|
||||||
|
# EOF -> rewind & continue
|
||||||
|
self._rewind_acc()
|
||||||
|
continue
|
||||||
|
|
||||||
|
row = [c.strip() for c in row]
|
||||||
|
if not row or not any(row):
|
||||||
|
continue
|
||||||
|
|
||||||
|
return row
|
||||||
|
|
||||||
|
def _next_gps_row(self) -> List[str]:
|
||||||
|
if self._gps_reader is None:
|
||||||
|
raise RuntimeError("GPS reader is not initialized.")
|
||||||
|
|
||||||
|
while True:
|
||||||
|
if self._gps_buf is not None:
|
||||||
|
row = self._gps_buf
|
||||||
|
self._gps_buf = None
|
||||||
|
else:
|
||||||
|
row = next(self._gps_reader, None)
|
||||||
|
|
||||||
|
if row is None:
|
||||||
|
# EOF -> rewind & continue
|
||||||
|
self._rewind_gps()
|
||||||
|
continue
|
||||||
|
|
||||||
|
row = [c.strip() for c in row]
|
||||||
|
if not row or not any(row):
|
||||||
|
continue
|
||||||
|
|
||||||
|
return row
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _detect_header_and_buffer(
|
||||||
|
rdr: csv.reader, expected_cols: int, header_tokens: tuple[str, ...]
|
||||||
|
) -> tuple[bool, Optional[List[str]]]:
|
||||||
|
|
||||||
|
first = None
|
||||||
|
while True:
|
||||||
|
first = next(rdr, None)
|
||||||
|
if first is None:
|
||||||
|
return False, None
|
||||||
|
first = [c.strip() for c in first]
|
||||||
|
if first and any(first):
|
||||||
|
break
|
||||||
|
|
||||||
|
norm = [c.lower() for c in first]
|
||||||
|
|
||||||
|
# Header if it contains the expected tokens
|
||||||
|
if all(tok in norm for tok in header_tokens):
|
||||||
|
return True, None
|
||||||
|
|
||||||
|
# If first row is numeric-like and has enough columns => it's data (buffer it back)
|
||||||
|
if len(norm) >= expected_cols and all(FileDatasource._is_number(x) for x in norm[:expected_cols]):
|
||||||
|
return False, first
|
||||||
|
|
||||||
|
# Otherwise treat it as header-ish (skip it)
|
||||||
|
Removal of checks is not neccesary, but is plausible since the source file should only contain correct data Removal of checks is not neccesary, but is plausible since the source file *should* only contain correct data
Resumed checks Resumed checks
|
|||||||
|
return True, None
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
This value range check might break future integration with higher precision accelerometer data, hence should be removed This value range check might break future integration with higher precision accelerometer data, hence should be removed
|
|||||||
|
def _parse_acc(row: List[str]) -> Accelerometer:
|
||||||
|
if len(row) < 3:
|
||||||
|
raise ValueError(f"Accelerometer row must have 3 values (x,y,z). Got: {row}")
|
||||||
|
x = int(float(row[0]))
|
||||||
|
y = int(float(row[1]))
|
||||||
|
z = int(float(row[2]))
|
||||||
|
return Accelerometer(x=x, y=y, z=z)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _parse_gps(row: List[str]) -> Gps:
|
||||||
|
if len(row) < 2:
|
||||||
|
raise ValueError(f"GPS row must have 2 values (longitude,latitude). Got: {row}")
|
||||||
|
lon = float(row[0])
|
||||||
|
lat = float(row[1])
|
||||||
|
return Gps(longitude=lon, latitude=lat)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _is_number(s: str) -> bool:
|
||||||
|
try:
|
||||||
|
float(s)
|
||||||
|
return True
|
||||||
|
except Exception:
|
||||||
|
return False
|
||||||
@@ -1,6 +1,4 @@
|
|||||||
from paho.mqtt import client as mqtt_client
|
from paho.mqtt import client as mqtt_client
|
||||||
import json
|
|
||||||
import time
|
|
||||||
from schema.aggregated_data_schema import AggregatedDataSchema
|
from schema.aggregated_data_schema import AggregatedDataSchema
|
||||||
from file_datasource import FileDatasource
|
from file_datasource import FileDatasource
|
||||||
import config
|
import config
|
||||||
@@ -24,19 +22,14 @@ def connect_mqtt(broker, port):
|
|||||||
return client
|
return client
|
||||||
|
|
||||||
|
|
||||||
def publish(client, topic, datasource, delay):
|
def publish(client, topic, datasource):
|
||||||
datasource.startReading()
|
datasource.startReading()
|
||||||
while True:
|
while True:
|
||||||
time.sleep(delay)
|
|
||||||
data = datasource.read()
|
data = datasource.read()
|
||||||
msg = AggregatedDataSchema().dumps(data)
|
msg = AggregatedDataSchema().dumps(data)
|
||||||
result = client.publish(topic, msg)
|
result = client.publish(topic, msg)
|
||||||
# result: [0, 1]
|
|
||||||
status = result[0]
|
status = result[0]
|
||||||
if status == 0:
|
if status != 0:
|
||||||
pass
|
|
||||||
# print(f"Send `{msg}` to topic `{topic}`")
|
|
||||||
else:
|
|
||||||
print(f"Failed to send message to topic {topic}")
|
print(f"Failed to send message to topic {topic}")
|
||||||
|
|
||||||
|
|
||||||
@@ -44,9 +37,9 @@ def run():
|
|||||||
# Prepare mqtt client
|
# Prepare mqtt client
|
||||||
client = connect_mqtt(config.MQTT_BROKER_HOST, config.MQTT_BROKER_PORT)
|
client = connect_mqtt(config.MQTT_BROKER_HOST, config.MQTT_BROKER_PORT)
|
||||||
# Prepare datasource
|
# Prepare datasource
|
||||||
datasource = FileDatasource("data/data.csv", "data/gps_data.csv")
|
datasource = FileDatasource("data/accelerometer.csv", "data/gps.csv")
|
||||||
# Infinity publish data
|
# Infinity publish data
|
||||||
publish(client, config.MQTT_TOPIC, datasource, config.DELAY)
|
publish(client, config.MQTT_TOPIC, datasource)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|||||||
Naming convention is misleading, suggests parser specifically takes 16-bit integers. This parser should read any integer value and should just be
int(row[0]), not a separate function callSeveral identical issues occur in the next few lines
Unused variables, seem to be leftovers from previously removed INT16 boundary checking
removed unnecessary variables
Method is not used and does not provide additional functionality compared to default
int()function, so should be removedFixed: removed int16-specific naming/helpers and now parse accelerometer values directly using int(row[i]) without any bit-width assumptions. Pushed the update to the PR.
Removed the unused _parse_int helper (it didn’t add any functionality over built-in int()). Pushed the update to the PR.
And after chage of rewinders, you can remove _acc_buf to hold first row
What does this means?
if assuming this is a header with indexies of domain objects maybe make sense.
Removed unnecessary buffers like _acc_buf and _gps_buf.
I initially tried removing this part, but the testing failed without it