Compare commits

...

19 Commits

Author SHA1 Message Date
ІМ-24 Владислав Коваленко
af94c007a2 fixes in imports and schemas 2026-02-26 10:54:34 +00:00
Senya
f9ef916331 SCRUM-34: slobodeniuk parking signals 2026-02-26 12:11:07 +02:00
VladiusVostokus
7ddfb68b02 Merge pull request #5 from Rhinemann/lab1_huranets
FileReader
2026-02-26 08:04:10 +00:00
9473c5a621 Remove unnecessary rewind after file open 2026-02-25 21:08:54 +02:00
953b0bdb9a Remove unused _detect_header_and_buffer method and related fields 2026-02-25 20:56:43 +02:00
f58596ebf7 Refactor FileDatasource: remove unused header detection variables 2026-02-25 12:35:58 +02:00
d621390f51 Refactor file rewind logic to skip header row and remove unnecessary buffers 2026-02-25 12:11:05 +02:00
e4be6b0a19 Refactor file rewinding logic to skip header row after seek(0) 2026-02-25 12:00:39 +02:00
fe66df9b8c Refactor row reading logic for clarity and efficiency 2026-02-25 11:50:56 +02:00
3e0b4762ef Optimize CSV parsing by adding skipinitialspace=True to csv.reader and removing unnecessary strip() calls 2026-02-24 22:11:37 +02:00
75613fd4fc Restore input validation for accelerometer parsing 2026-02-24 20:47:00 +02:00
a25fbfc3ef Simplify accelerometer parsing and remove int16-specific leftovers 2026-02-24 20:14:56 +02:00
ca790e7306 Remove int16 binding from datasource 2026-02-24 19:55:50 +02:00
1643767094 Remove int16 range check per review 2026-02-24 15:41:41 +02:00
3d94bf3008 Parse accelerometer values as int16 (remove float conversion) 2026-02-24 15:12:49 +02:00
c5d98d53cd Add mosquitto runtime folders to gitignore 2026-02-24 14:25:27 +02:00
07a0e906d8 Fix timestamp field in AggregatedData 2026-02-24 14:21:41 +02:00
VladiusVostokus
9bf3741f32 Merge pull request #4 from Rhinemann/lab1_shmuliar
[SCRUM-40] [L1] Використання STM32 як акселерометра
2026-02-24 11:37:27 +00:00
c974ac32f6 Реалізовую базовий FileReader та переношу sleep до FileDatasource.read() 2026-02-23 22:01:11 +02:00
9 changed files with 235 additions and 28 deletions

4
.gitignore vendored Normal file
View File

@@ -0,0 +1,4 @@
agent/docker/mosquitto/data/
agent/docker/mosquitto/log/
.idea/

0
agent/src/__init__.py Normal file
View File

View File

@@ -0,0 +1,22 @@
longitude,latitude,empty_count
50.450386085935094,30.524547100067142,10
50.450386085935094,30.524547100067142,11
50.450386085935094,30.524547100067142,13
50.450386085935094,30.524547100067142,15
50.450386085935094,30.524547100067142,7
50.450386085935094,30.524547100067142,9
50.450386085935094,30.524547100067142,4
50.450386085935094,30.524547100067142,0
50.450386085935094,30.524547100067142,0
50.450386085935094,30.524547100067142,3
50.450386085935094,30.524547100067142,4
50.450069433207545,30.52406822530458,16
50.450069433207545,30.52406822530458,20
50.450069433207545,30.52406822530458,25
50.450069433207545,30.52406822530458,30
50.450069433207545,30.52406822530458,29
50.450069433207545,30.52406822530458,12
50.450069433207545,30.52406822530458,10
50.450069433207545,30.52406822530458,14
50.450069433207545,30.52406822530458,3
50.450069433207545,30.52406822530458,2
1 longitude latitude empty_count
2 50.450386085935094 30.524547100067142 10
3 50.450386085935094 30.524547100067142 11
4 50.450386085935094 30.524547100067142 13
5 50.450386085935094 30.524547100067142 15
6 50.450386085935094 30.524547100067142 7
7 50.450386085935094 30.524547100067142 9
8 50.450386085935094 30.524547100067142 4
9 50.450386085935094 30.524547100067142 0
10 50.450386085935094 30.524547100067142 0
11 50.450386085935094 30.524547100067142 3
12 50.450386085935094 30.524547100067142 4
13 50.450069433207545 30.52406822530458 16
14 50.450069433207545 30.52406822530458 20
15 50.450069433207545 30.52406822530458 25
16 50.450069433207545 30.52406822530458 30
17 50.450069433207545 30.52406822530458 29
18 50.450069433207545 30.52406822530458 12
19 50.450069433207545 30.52406822530458 10
20 50.450069433207545 30.52406822530458 14
21 50.450069433207545 30.52406822530458 3
22 50.450069433207545 30.52406822530458 2

View File

@@ -1,13 +1,16 @@
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime from datetime import datetime
from domain.accelerometer import Accelerometer from domain.accelerometer import Accelerometer
from domain.gps import Gps from domain.gps import Gps
from domain.parking import Parking
@dataclass @dataclass
class AggregatedData: class AggregatedData:
accelerometer: Accelerometer accelerometer: Accelerometer
gps: Gps gps: Gps
parking: Parking
timestamp: datetime timestamp: datetime
user_id: int user_id: int

View File

@@ -0,0 +1,9 @@
from dataclasses import dataclass
from domain.gps import Gps
@dataclass
class Parking:
empty_count: int
gps: Gps

View File

@@ -1,5 +1,10 @@
from csv import reader import csv
import time
from datetime import datetime from datetime import datetime
from pathlib import Path
from typing import Optional, List
from domain.parking import Parking
from domain.accelerometer import Accelerometer from domain.accelerometer import Accelerometer
from domain.gps import Gps from domain.gps import Gps
from domain.aggregated_data import AggregatedData from domain.aggregated_data import AggregatedData
@@ -7,24 +12,185 @@ import config
class FileDatasource: class FileDatasource:
def __init__(
self,
accelerometer_filename: str,
gps_filename: str,
) -> None:
pass
def read(self) -> AggregatedData: def __init__(
"""Метод повертає дані отримані з датчиків""" self,
return AggregatedData( accelerometer_filename: str,
Accelerometer(1, 2, 3), gps_filename: str,
Gps(4, 5), park_filename: str,
datetime.now(), ) -> None:
config.USER_ID,
) self.accelerometer_filename = accelerometer_filename
self.park_filename = park_filename
self.gps_filename = gps_filename
self._park_f = None
self._acc_f = None
self._gps_f = None
self._park_reader: Optional[csv.reader] = None
self._acc_reader: Optional[csv.reader] = None
self._gps_reader: Optional[csv.reader] = None
self._started = False
def startReading(self, *args, **kwargs): def startReading(self, *args, **kwargs):
"""Метод повинен викликатись перед початком читання даних""" """Must be called before read()"""
if self._started:
return
if not Path(self.accelerometer_filename).exists():
raise FileNotFoundError(f"Accelerometer file not found: {self.accelerometer_filename}")
if not Path(self.park_filename).exists():
raise FileNotFoundError(f"Accelerometer file not found: {self.park_filename}")
if not Path(self.gps_filename).exists():
raise FileNotFoundError(f"GPS file not found: {self.gps_filename}")
self._open_files()
self._started = True
def stopReading(self, *args, **kwargs): def stopReading(self, *args, **kwargs):
"""Метод повинен викликатись для закінчення читання даних""" """Must be called when finishing reading"""
self._close_files()
self._started = False
def read(self) -> AggregatedData:
"""Return one combined sample (acc + gps)."""
if not self._started:
raise RuntimeError("Datasource is not started. Call startReading() before read().")
acc_row = self._get_next_row(self._acc_reader, source="acc")
park_row = self._get_next_row(self._park_reader, source="park")
gps_row = self._get_next_row(self._gps_reader, source="gps")
acc = self._parse_acc(acc_row)
park = self._parse_park(park_row)
gps = self._parse_gps(gps_row)
# IMPORTANT: timing belongs to datasource (not MQTT / main.py)
if config.DELAY and config.DELAY > 0:
time.sleep(float(config.DELAY))
return AggregatedData(
accelerometer=acc,
gps=gps,
parking=park,
timestamp=datetime.utcnow(),
user_id=config.USER_ID,
)
# ---------------- internal ----------------
def _open_files(self) -> None:
self._close_files()
self._acc_f = open(self.accelerometer_filename, "r", newline="", encoding="utf-8")
self._park_f = open(self.park_filename, "r", newline="", encoding="utf-8")
self._gps_f = open(self.gps_filename, "r", newline="", encoding="utf-8")
self._acc_reader = csv.reader(self._acc_f, skipinitialspace=True)
self._park_reader = csv.reader(self._park_f, skipinitialspace=True)
self._gps_reader = csv.reader(self._gps_f, skipinitialspace=True)
# File pointer is already at 0 right after open(), so no need to rewind here.
# Skip header row once.
next(self._acc_reader, None)
next(self._park_reader, None)
next(self._gps_reader, None)
def _close_files(self) -> None:
for f in (self._acc_f, self._gps_f):
try:
if f is not None:
f.close()
except Exception:
pass
self._acc_f = None
self._park_f = None
self._gps_f = None
self._acc_reader = None
self._park_reader = None
self._gps_reader = None
def _rewind_acc(self) -> None:
if self._acc_f is None:
raise RuntimeError("Accelerometer file is not open.")
self._acc_f.seek(0)
self._acc_reader = csv.reader(self._acc_f, skipinitialspace=True)
next(self._acc_reader, None) # skip header row
def _rewind_gps(self) -> None:
if self._gps_f is None:
raise RuntimeError("GPS file is not open.")
self._gps_f.seek(0)
self._gps_reader = csv.reader(self._gps_f, skipinitialspace=True)
next(self._gps_reader, None) # skip header row
def _rewind_park(self) -> None:
if self._park_f is None:
raise RuntimeError("GPS file is not open.")
self._park_f.seek(0)
self._park_reader = csv.reader(self._park_f, skipinitialspace=True)
next(self._park_reader, None) # skip header row
def _get_next_row(self, reader, source: str) -> List[str]:
"""Get the next valid row from the reader."""
if reader is None:
raise RuntimeError("Reader is not initialized.")
while True:
row = next(reader, None)
if row is None:
# EOF -> rewind & continue
if source == "acc":
self._rewind_acc()
reader = self._acc_reader
elif source == 'park':
self._rewind_park()
reader = self._park_reader
else:
self._rewind_gps()
reader = self._gps_reader
continue
if not row or not any(cell.strip() for cell in row):
continue
return row
@staticmethod
def _parse_acc(row: List[str]) -> Accelerometer:
if len(row) < 3:
raise ValueError(f"Accelerometer row must have 3 values (x,y,z). Got: {row}")
try:
x = int(row[0])
y = int(row[1])
z = int(row[2])
except ValueError as e:
raise ValueError(f"Invalid accelerometer values (expected integers): {row}") from e
return Accelerometer(x=x, y=y, z=z)
@staticmethod
def _parse_gps(row: List[str]) -> Gps:
if len(row) < 2:
raise ValueError(f"GPS row must have 2 values (longitude,latitude). Got: {row}")
lon = float(row[0])
lat = float(row[1])
return Gps(longitude=lon, latitude=lat)
@staticmethod
def _parse_park(row: List[str]) -> Parking:
if len(row) < 2:
raise ValueError(f"GPS row must have 2 values (longitude,latitude). Got: {row}")
lon = float(row[0])
lat = float(row[1])
empty_count = int(row[2])
return Parking(
gps=Gps(longitude=lon, latitude=lat),
empty_count=empty_count
)

View File

@@ -1,6 +1,4 @@
from paho.mqtt import client as mqtt_client from paho.mqtt import client as mqtt_client
import json
import time
from schema.aggregated_data_schema import AggregatedDataSchema from schema.aggregated_data_schema import AggregatedDataSchema
from file_datasource import FileDatasource from file_datasource import FileDatasource
import config import config
@@ -24,19 +22,14 @@ def connect_mqtt(broker, port):
return client return client
def publish(client, topic, datasource, delay): def publish(client, topic, datasource):
datasource.startReading() datasource.startReading()
while True: while True:
time.sleep(delay)
data = datasource.read() data = datasource.read()
msg = AggregatedDataSchema().dumps(data) msg = AggregatedDataSchema().dumps(data)
result = client.publish(topic, msg) result = client.publish(topic, msg)
# result: [0, 1]
status = result[0] status = result[0]
if status == 0: if status != 0:
pass
# print(f"Send `{msg}` to topic `{topic}`")
else:
print(f"Failed to send message to topic {topic}") print(f"Failed to send message to topic {topic}")
@@ -44,9 +37,9 @@ def run():
# Prepare mqtt client # Prepare mqtt client
client = connect_mqtt(config.MQTT_BROKER_HOST, config.MQTT_BROKER_PORT) client = connect_mqtt(config.MQTT_BROKER_HOST, config.MQTT_BROKER_PORT)
# Prepare datasource # Prepare datasource
datasource = FileDatasource("data/data.csv", "data/gps_data.csv") datasource = FileDatasource("data/accelerometer.csv", "data/gps.csv", "data/parking.csv")
# Infinity publish data # Infinity publish data
publish(client, config.MQTT_TOPIC, datasource, config.DELAY) publish(client, config.MQTT_TOPIC, datasource)
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -1,10 +1,12 @@
from marshmallow import Schema, fields from marshmallow import Schema, fields
from schema.accelerometer_schema import AccelerometerSchema from schema.accelerometer_schema import AccelerometerSchema
from schema.gps_schema import GpsSchema from schema.gps_schema import GpsSchema
from schema.parking_schema import ParkingSchema
class AggregatedDataSchema(Schema): class AggregatedDataSchema(Schema):
accelerometer = fields.Nested(AccelerometerSchema) accelerometer = fields.Nested(AccelerometerSchema)
gps = fields.Nested(GpsSchema) gps = fields.Nested(GpsSchema)
parking = fields.Nested(ParkingSchema)
timestamp = fields.DateTime("iso") timestamp = fields.DateTime("iso")
user_id = fields.Int() user_id = fields.Int()

View File

@@ -0,0 +1,8 @@
from marshmallow import Schema, fields
from schema.gps_schema import GpsSchema
class ParkingSchema(Schema):
gps = fields.Nested(GpsSchema)
empty_count = fields.Int()