Compare commits
19 Commits
lab1_shmul
...
lab1-slobo
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
af94c007a2 | ||
|
|
f9ef916331 | ||
|
|
7ddfb68b02 | ||
| 9473c5a621 | |||
| 953b0bdb9a | |||
| f58596ebf7 | |||
| d621390f51 | |||
| e4be6b0a19 | |||
| fe66df9b8c | |||
| 3e0b4762ef | |||
| 75613fd4fc | |||
| a25fbfc3ef | |||
| ca790e7306 | |||
| 1643767094 | |||
| 3d94bf3008 | |||
| c5d98d53cd | |||
| 07a0e906d8 | |||
|
|
9bf3741f32 | ||
| c974ac32f6 |
4
.gitignore
vendored
Normal file
4
.gitignore
vendored
Normal file
@@ -0,0 +1,4 @@
|
||||
agent/docker/mosquitto/data/
|
||||
agent/docker/mosquitto/log/
|
||||
|
||||
.idea/
|
||||
0
agent/src/__init__.py
Normal file
0
agent/src/__init__.py
Normal file
22
agent/src/data/parking.csv
Normal file
22
agent/src/data/parking.csv
Normal file
@@ -0,0 +1,22 @@
|
||||
longitude,latitude,empty_count
|
||||
50.450386085935094,30.524547100067142,10
|
||||
50.450386085935094,30.524547100067142,11
|
||||
50.450386085935094,30.524547100067142,13
|
||||
50.450386085935094,30.524547100067142,15
|
||||
50.450386085935094,30.524547100067142,7
|
||||
50.450386085935094,30.524547100067142,9
|
||||
50.450386085935094,30.524547100067142,4
|
||||
50.450386085935094,30.524547100067142,0
|
||||
50.450386085935094,30.524547100067142,0
|
||||
50.450386085935094,30.524547100067142,3
|
||||
50.450386085935094,30.524547100067142,4
|
||||
50.450069433207545,30.52406822530458,16
|
||||
50.450069433207545,30.52406822530458,20
|
||||
50.450069433207545,30.52406822530458,25
|
||||
50.450069433207545,30.52406822530458,30
|
||||
50.450069433207545,30.52406822530458,29
|
||||
50.450069433207545,30.52406822530458,12
|
||||
50.450069433207545,30.52406822530458,10
|
||||
50.450069433207545,30.52406822530458,14
|
||||
50.450069433207545,30.52406822530458,3
|
||||
50.450069433207545,30.52406822530458,2
|
||||
|
@@ -1,13 +1,16 @@
|
||||
from dataclasses import dataclass
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
from domain.accelerometer import Accelerometer
|
||||
from domain.gps import Gps
|
||||
from domain.parking import Parking
|
||||
|
||||
|
||||
@dataclass
|
||||
class AggregatedData:
|
||||
accelerometer: Accelerometer
|
||||
gps: Gps
|
||||
parking: Parking
|
||||
timestamp: datetime
|
||||
user_id: int
|
||||
|
||||
9
agent/src/domain/parking.py
Normal file
9
agent/src/domain/parking.py
Normal file
@@ -0,0 +1,9 @@
|
||||
from dataclasses import dataclass
|
||||
|
||||
from domain.gps import Gps
|
||||
|
||||
|
||||
@dataclass
|
||||
class Parking:
|
||||
empty_count: int
|
||||
gps: Gps
|
||||
@@ -1,5 +1,10 @@
|
||||
from csv import reader
|
||||
import csv
|
||||
import time
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Optional, List
|
||||
|
||||
from domain.parking import Parking
|
||||
from domain.accelerometer import Accelerometer
|
||||
from domain.gps import Gps
|
||||
from domain.aggregated_data import AggregatedData
|
||||
@@ -7,24 +12,185 @@ import config
|
||||
|
||||
|
||||
class FileDatasource:
|
||||
def __init__(
|
||||
self,
|
||||
accelerometer_filename: str,
|
||||
gps_filename: str,
|
||||
) -> None:
|
||||
pass
|
||||
|
||||
def read(self) -> AggregatedData:
|
||||
"""Метод повертає дані отримані з датчиків"""
|
||||
return AggregatedData(
|
||||
Accelerometer(1, 2, 3),
|
||||
Gps(4, 5),
|
||||
datetime.now(),
|
||||
config.USER_ID,
|
||||
)
|
||||
def __init__(
|
||||
self,
|
||||
accelerometer_filename: str,
|
||||
gps_filename: str,
|
||||
park_filename: str,
|
||||
) -> None:
|
||||
|
||||
self.accelerometer_filename = accelerometer_filename
|
||||
self.park_filename = park_filename
|
||||
self.gps_filename = gps_filename
|
||||
|
||||
self._park_f = None
|
||||
self._acc_f = None
|
||||
self._gps_f = None
|
||||
|
||||
self._park_reader: Optional[csv.reader] = None
|
||||
self._acc_reader: Optional[csv.reader] = None
|
||||
self._gps_reader: Optional[csv.reader] = None
|
||||
|
||||
self._started = False
|
||||
|
||||
def startReading(self, *args, **kwargs):
|
||||
"""Метод повинен викликатись перед початком читання даних"""
|
||||
"""Must be called before read()"""
|
||||
if self._started:
|
||||
return
|
||||
|
||||
if not Path(self.accelerometer_filename).exists():
|
||||
raise FileNotFoundError(f"Accelerometer file not found: {self.accelerometer_filename}")
|
||||
if not Path(self.park_filename).exists():
|
||||
raise FileNotFoundError(f"Accelerometer file not found: {self.park_filename}")
|
||||
if not Path(self.gps_filename).exists():
|
||||
raise FileNotFoundError(f"GPS file not found: {self.gps_filename}")
|
||||
|
||||
self._open_files()
|
||||
self._started = True
|
||||
|
||||
def stopReading(self, *args, **kwargs):
|
||||
"""Метод повинен викликатись для закінчення читання даних"""
|
||||
"""Must be called when finishing reading"""
|
||||
self._close_files()
|
||||
self._started = False
|
||||
|
||||
def read(self) -> AggregatedData:
|
||||
"""Return one combined sample (acc + gps)."""
|
||||
if not self._started:
|
||||
raise RuntimeError("Datasource is not started. Call startReading() before read().")
|
||||
|
||||
acc_row = self._get_next_row(self._acc_reader, source="acc")
|
||||
park_row = self._get_next_row(self._park_reader, source="park")
|
||||
gps_row = self._get_next_row(self._gps_reader, source="gps")
|
||||
|
||||
acc = self._parse_acc(acc_row)
|
||||
park = self._parse_park(park_row)
|
||||
gps = self._parse_gps(gps_row)
|
||||
|
||||
# IMPORTANT: timing belongs to datasource (not MQTT / main.py)
|
||||
if config.DELAY and config.DELAY > 0:
|
||||
time.sleep(float(config.DELAY))
|
||||
|
||||
return AggregatedData(
|
||||
accelerometer=acc,
|
||||
gps=gps,
|
||||
parking=park,
|
||||
timestamp=datetime.utcnow(),
|
||||
user_id=config.USER_ID,
|
||||
)
|
||||
|
||||
# ---------------- internal ----------------
|
||||
|
||||
def _open_files(self) -> None:
|
||||
self._close_files()
|
||||
|
||||
self._acc_f = open(self.accelerometer_filename, "r", newline="", encoding="utf-8")
|
||||
self._park_f = open(self.park_filename, "r", newline="", encoding="utf-8")
|
||||
self._gps_f = open(self.gps_filename, "r", newline="", encoding="utf-8")
|
||||
|
||||
self._acc_reader = csv.reader(self._acc_f, skipinitialspace=True)
|
||||
self._park_reader = csv.reader(self._park_f, skipinitialspace=True)
|
||||
self._gps_reader = csv.reader(self._gps_f, skipinitialspace=True)
|
||||
|
||||
# File pointer is already at 0 right after open(), so no need to rewind here.
|
||||
# Skip header row once.
|
||||
next(self._acc_reader, None)
|
||||
next(self._park_reader, None)
|
||||
next(self._gps_reader, None)
|
||||
|
||||
def _close_files(self) -> None:
|
||||
for f in (self._acc_f, self._gps_f):
|
||||
try:
|
||||
if f is not None:
|
||||
f.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
self._acc_f = None
|
||||
self._park_f = None
|
||||
self._gps_f = None
|
||||
self._acc_reader = None
|
||||
self._park_reader = None
|
||||
self._gps_reader = None
|
||||
|
||||
def _rewind_acc(self) -> None:
|
||||
if self._acc_f is None:
|
||||
raise RuntimeError("Accelerometer file is not open.")
|
||||
self._acc_f.seek(0)
|
||||
self._acc_reader = csv.reader(self._acc_f, skipinitialspace=True)
|
||||
next(self._acc_reader, None) # skip header row
|
||||
|
||||
def _rewind_gps(self) -> None:
|
||||
if self._gps_f is None:
|
||||
raise RuntimeError("GPS file is not open.")
|
||||
self._gps_f.seek(0)
|
||||
self._gps_reader = csv.reader(self._gps_f, skipinitialspace=True)
|
||||
next(self._gps_reader, None) # skip header row
|
||||
|
||||
def _rewind_park(self) -> None:
|
||||
if self._park_f is None:
|
||||
raise RuntimeError("GPS file is not open.")
|
||||
self._park_f.seek(0)
|
||||
self._park_reader = csv.reader(self._park_f, skipinitialspace=True)
|
||||
next(self._park_reader, None) # skip header row
|
||||
|
||||
def _get_next_row(self, reader, source: str) -> List[str]:
|
||||
"""Get the next valid row from the reader."""
|
||||
if reader is None:
|
||||
raise RuntimeError("Reader is not initialized.")
|
||||
|
||||
while True:
|
||||
row = next(reader, None)
|
||||
if row is None:
|
||||
# EOF -> rewind & continue
|
||||
if source == "acc":
|
||||
self._rewind_acc()
|
||||
reader = self._acc_reader
|
||||
|
||||
elif source == 'park':
|
||||
self._rewind_park()
|
||||
reader = self._park_reader
|
||||
else:
|
||||
self._rewind_gps()
|
||||
reader = self._gps_reader
|
||||
continue
|
||||
|
||||
if not row or not any(cell.strip() for cell in row):
|
||||
continue
|
||||
|
||||
return row
|
||||
|
||||
@staticmethod
|
||||
def _parse_acc(row: List[str]) -> Accelerometer:
|
||||
if len(row) < 3:
|
||||
raise ValueError(f"Accelerometer row must have 3 values (x,y,z). Got: {row}")
|
||||
|
||||
try:
|
||||
x = int(row[0])
|
||||
y = int(row[1])
|
||||
z = int(row[2])
|
||||
except ValueError as e:
|
||||
raise ValueError(f"Invalid accelerometer values (expected integers): {row}") from e
|
||||
|
||||
return Accelerometer(x=x, y=y, z=z)
|
||||
|
||||
@staticmethod
|
||||
def _parse_gps(row: List[str]) -> Gps:
|
||||
if len(row) < 2:
|
||||
raise ValueError(f"GPS row must have 2 values (longitude,latitude). Got: {row}")
|
||||
lon = float(row[0])
|
||||
lat = float(row[1])
|
||||
return Gps(longitude=lon, latitude=lat)
|
||||
|
||||
@staticmethod
|
||||
def _parse_park(row: List[str]) -> Parking:
|
||||
if len(row) < 2:
|
||||
raise ValueError(f"GPS row must have 2 values (longitude,latitude). Got: {row}")
|
||||
lon = float(row[0])
|
||||
lat = float(row[1])
|
||||
empty_count = int(row[2])
|
||||
|
||||
return Parking(
|
||||
gps=Gps(longitude=lon, latitude=lat),
|
||||
empty_count=empty_count
|
||||
)
|
||||
|
||||
@@ -1,6 +1,4 @@
|
||||
from paho.mqtt import client as mqtt_client
|
||||
import json
|
||||
import time
|
||||
from schema.aggregated_data_schema import AggregatedDataSchema
|
||||
from file_datasource import FileDatasource
|
||||
import config
|
||||
@@ -24,19 +22,14 @@ def connect_mqtt(broker, port):
|
||||
return client
|
||||
|
||||
|
||||
def publish(client, topic, datasource, delay):
|
||||
def publish(client, topic, datasource):
|
||||
datasource.startReading()
|
||||
while True:
|
||||
time.sleep(delay)
|
||||
data = datasource.read()
|
||||
msg = AggregatedDataSchema().dumps(data)
|
||||
result = client.publish(topic, msg)
|
||||
# result: [0, 1]
|
||||
status = result[0]
|
||||
if status == 0:
|
||||
pass
|
||||
# print(f"Send `{msg}` to topic `{topic}`")
|
||||
else:
|
||||
if status != 0:
|
||||
print(f"Failed to send message to topic {topic}")
|
||||
|
||||
|
||||
@@ -44,9 +37,9 @@ def run():
|
||||
# Prepare mqtt client
|
||||
client = connect_mqtt(config.MQTT_BROKER_HOST, config.MQTT_BROKER_PORT)
|
||||
# Prepare datasource
|
||||
datasource = FileDatasource("data/data.csv", "data/gps_data.csv")
|
||||
datasource = FileDatasource("data/accelerometer.csv", "data/gps.csv", "data/parking.csv")
|
||||
# Infinity publish data
|
||||
publish(client, config.MQTT_TOPIC, datasource, config.DELAY)
|
||||
publish(client, config.MQTT_TOPIC, datasource)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -1,10 +1,12 @@
|
||||
from marshmallow import Schema, fields
|
||||
from schema.accelerometer_schema import AccelerometerSchema
|
||||
from schema.gps_schema import GpsSchema
|
||||
from schema.parking_schema import ParkingSchema
|
||||
|
||||
|
||||
class AggregatedDataSchema(Schema):
|
||||
accelerometer = fields.Nested(AccelerometerSchema)
|
||||
gps = fields.Nested(GpsSchema)
|
||||
parking = fields.Nested(ParkingSchema)
|
||||
timestamp = fields.DateTime("iso")
|
||||
user_id = fields.Int()
|
||||
|
||||
8
agent/src/schema/parking_schema.py
Normal file
8
agent/src/schema/parking_schema.py
Normal file
@@ -0,0 +1,8 @@
|
||||
from marshmallow import Schema, fields
|
||||
|
||||
from schema.gps_schema import GpsSchema
|
||||
|
||||
|
||||
class ParkingSchema(Schema):
|
||||
gps = fields.Nested(GpsSchema)
|
||||
empty_count = fields.Int()
|
||||
Reference in New Issue
Block a user