mirror of
https://github.com/Rhinemann/IoT-Systems.git
synced 2026-03-14 20:50:39 +02:00
Refactor file rewind logic to skip header row and remove unnecessary buffers
This commit is contained in:
parent
e4be6b0a19
commit
d621390f51
@ -24,8 +24,8 @@ class FileDatasource:
|
|||||||
self._started = False
|
self._started = False
|
||||||
|
|
||||||
# one-row buffers (supports CSVs with or without header)
|
# one-row buffers (supports CSVs with or without header)
|
||||||
self._acc_buf: Optional[List[str]] = None
|
self._acc_has_header: Optional[bool] = None
|
||||||
self._gps_buf: Optional[List[str]] = None
|
self._gps_has_header: Optional[bool] = None
|
||||||
|
|
||||||
def startReading(self, *args, **kwargs):
|
def startReading(self, *args, **kwargs):
|
||||||
"""Must be called before read()"""
|
"""Must be called before read()"""
|
||||||
@ -50,8 +50,8 @@ class FileDatasource:
|
|||||||
if not self._started:
|
if not self._started:
|
||||||
raise RuntimeError("Datasource is not started. Call startReading() before read().")
|
raise RuntimeError("Datasource is not started. Call startReading() before read().")
|
||||||
|
|
||||||
acc_row = self._get_next_row(self._acc_reader, self._acc_buf)
|
acc_row = self._get_next_row(self._acc_reader)
|
||||||
gps_row = self._get_next_row(self._gps_reader, self._gps_buf)
|
gps_row = self._get_next_row(self._gps_reader)
|
||||||
|
|
||||||
acc = self._parse_acc(acc_row)
|
acc = self._parse_acc(acc_row)
|
||||||
gps = self._parse_gps(gps_row)
|
gps = self._parse_gps(gps_row)
|
||||||
@ -78,16 +78,12 @@ class FileDatasource:
|
|||||||
self._acc_reader = csv.reader(self._acc_f, skipinitialspace=True)
|
self._acc_reader = csv.reader(self._acc_f, skipinitialspace=True)
|
||||||
self._gps_reader = csv.reader(self._gps_f, skipinitialspace=True)
|
self._gps_reader = csv.reader(self._gps_f, skipinitialspace=True)
|
||||||
|
|
||||||
self._acc_buf = None
|
self._acc_has_header = None
|
||||||
self._gps_buf = None
|
self._gps_has_header = None
|
||||||
|
|
||||||
# detect header / buffer first data row (we only need the buffered row)
|
# detect header / buffer first data row (we only need the buffered row)
|
||||||
_, self._acc_buf = self._detect_header_and_buffer(
|
self._rewind_acc()
|
||||||
self._acc_reader, expected_cols=3, header_tokens=("x", "y", "z")
|
self._rewind_gps()
|
||||||
)
|
|
||||||
_, self._gps_buf = self._detect_header_and_buffer(
|
|
||||||
self._gps_reader, expected_cols=2, header_tokens=("longitude", "latitude")
|
|
||||||
)
|
|
||||||
|
|
||||||
def _close_files(self) -> None:
|
def _close_files(self) -> None:
|
||||||
for f in (self._acc_f, self._gps_f):
|
for f in (self._acc_f, self._gps_f):
|
||||||
@ -101,8 +97,6 @@ class FileDatasource:
|
|||||||
self._gps_f = None
|
self._gps_f = None
|
||||||
self._acc_reader = None
|
self._acc_reader = None
|
||||||
self._gps_reader = None
|
self._gps_reader = None
|
||||||
self._acc_buf = None
|
|
||||||
self._gps_buf = None
|
|
||||||
|
|
||||||
def _rewind_acc(self) -> None:
|
def _rewind_acc(self) -> None:
|
||||||
if self._acc_f is None:
|
if self._acc_f is None:
|
||||||
@ -110,7 +104,7 @@ class FileDatasource:
|
|||||||
self._acc_f.seek(0)
|
self._acc_f.seek(0)
|
||||||
self._acc_reader = csv.reader(self._acc_f, skipinitialspace=True)
|
self._acc_reader = csv.reader(self._acc_f, skipinitialspace=True)
|
||||||
next(self._acc_reader) # Skip header row
|
next(self._acc_reader) # Skip header row
|
||||||
_, self._acc_buf = self._detect_header_and_buffer(
|
self._acc_has_header, _ = self._detect_header_and_buffer(
|
||||||
self._acc_reader, expected_cols=3, header_tokens=("x", "y", "z")
|
self._acc_reader, expected_cols=3, header_tokens=("x", "y", "z")
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -120,22 +114,17 @@ class FileDatasource:
|
|||||||
self._gps_f.seek(0)
|
self._gps_f.seek(0)
|
||||||
self._gps_reader = csv.reader(self._gps_f, skipinitialspace=True)
|
self._gps_reader = csv.reader(self._gps_f, skipinitialspace=True)
|
||||||
next(self._gps_reader) # Skip header row
|
next(self._gps_reader) # Skip header row
|
||||||
_, self._gps_buf = self._detect_header_and_buffer(
|
self._gps_has_header, _ = self._detect_header_and_buffer(
|
||||||
self._gps_reader, expected_cols=2, header_tokens=("longitude", "latitude")
|
self._gps_reader, expected_cols=2, header_tokens=("longitude", "latitude")
|
||||||
)
|
)
|
||||||
|
|
||||||
def _get_next_row(self, reader, buffer) -> List[str]:
|
def _get_next_row(self, reader) -> List[str]:
|
||||||
"""Get the next valid row from the reader or buffer."""
|
"""Get the next valid row from the reader."""
|
||||||
if reader is None:
|
if reader is None:
|
||||||
raise RuntimeError("Reader is not initialized.")
|
raise RuntimeError("Reader is not initialized.")
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
if buffer is not None:
|
|
||||||
row = buffer
|
|
||||||
buffer = None
|
|
||||||
else:
|
|
||||||
row = next(reader, None)
|
row = next(reader, None)
|
||||||
|
|
||||||
if row is None:
|
if row is None:
|
||||||
# EOF -> rewind & continue
|
# EOF -> rewind & continue
|
||||||
self._rewind_acc() if reader == self._acc_reader else self._rewind_gps()
|
self._rewind_acc() if reader == self._acc_reader else self._rewind_gps()
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user