Compare commits

...

2 Commits

Author SHA1 Message Date
3120e0054c Реалізовую базовий FileReader та переношу sleep до FileDatasource.read()
Some checks failed
Test Agent / test-agent-run (push) Failing after 4s
2026-02-24 00:16:22 +02:00
f5927d7157 lab1: add CI/CD testing
All checks were successful
Test Agent / test-agent-run (push) Successful in 7s
2026-02-24 00:13:07 +02:00
7 changed files with 289 additions and 29 deletions

View File

@ -0,0 +1,25 @@
name: Test Agent
on: [push]
jobs:
test-agent-run:
runs-on: debian-x86_64
steps:
- name: Fetch the repository
run: git clone --branch ${{ gitea.ref_name }} --depth 1 ${{ gitea.server_url }}/${{ gitea.repository }}
- name: Build containers
run: docker-compose -f docker-compose-test.yaml build
working-directory: sem8-iot-test/agent/docker
- name: Start MQTT broker
run: docker-compose -f docker-compose-test.yaml up -d mqtt
working-directory: sem8-iot-test/agent/docker
- name: Start agent
run: docker-compose -f docker-compose-test.yaml run fake_agent
working-directory: sem8-iot-test/agent/docker
- name: Clean up
if: always()
run: docker-compose -f docker-compose-test.yaml down
working-directory: sem8-iot-test/agent/docker

View File

@ -0,0 +1,34 @@
version: "3.3"
#name: "road_vision"
services:
mqtt:
image: eclipse-mosquitto
container_name: mqtt
volumes:
- ./mosquitto:/mosquitto
- ./mosquitto/data:/mosquitto/data
- ./mosquitto/log:/mosquitto/log
ports:
- 1883:1883
- 9001:9001
networks:
mqtt_network:
fake_agent:
container_name: agent
build: ../
depends_on:
- mqtt
environment:
MQTT_BROKER_HOST: "mqtt"
MQTT_BROKER_PORT: 1883
MQTT_TOPIC: "agent_data_topic"
DELAY: 0.1
MAX_SENDS: 30
networks:
mqtt_network:
networks:
mqtt_network:

View File

@ -1,4 +1,5 @@
name: "road_vision" version: "3.3"
#name: "road_vision"
services: services:
mqtt: mqtt:
image: eclipse-mosquitto image: eclipse-mosquitto

View File

@ -16,3 +16,6 @@ MQTT_TOPIC = os.environ.get("MQTT_TOPIC") or "agent"
# Delay for sending data to mqtt in seconds # Delay for sending data to mqtt in seconds
DELAY = try_parse(float, os.environ.get("DELAY")) or 1 DELAY = try_parse(float, os.environ.get("DELAY")) or 1
# Testing switches for CI/CD
MAX_SENDS = try_parse(int, os.environ.get("MAX_SENDS"))

View File

@ -1,5 +1,11 @@
from csv import reader from __future__ import annotations
import csv
import time
from datetime import datetime from datetime import datetime
from pathlib import Path
from typing import Optional, List
from domain.accelerometer import Accelerometer from domain.accelerometer import Accelerometer
from domain.gps import Gps from domain.gps import Gps
from domain.aggregated_data import AggregatedData from domain.aggregated_data import AggregatedData
@ -7,24 +13,213 @@ import config
class FileDatasource: class FileDatasource:
def __init__(
self,
accelerometer_filename: str,
gps_filename: str,
) -> None:
pass
def read(self) -> AggregatedData: def __init__(self, accelerometer_filename: str, gps_filename: str) -> None:
"""Метод повертає дані отримані з датчиків""" self.accelerometer_filename = accelerometer_filename
return AggregatedData( self.gps_filename = gps_filename
Accelerometer(1, 2, 3),
Gps(4, 5), self._acc_f = None
datetime.now(), self._gps_f = None
config.USER_ID, self._acc_reader: Optional[csv.reader] = None
) self._gps_reader: Optional[csv.reader] = None
self._started = False
# one-row buffers (supports CSVs with or without header)
self._acc_buf: Optional[List[str]] = None
self._gps_buf: Optional[List[str]] = None
self._acc_has_header: Optional[bool] = None
self._gps_has_header: Optional[bool] = None
def startReading(self, *args, **kwargs): def startReading(self, *args, **kwargs):
"""Метод повинен викликатись перед початком читання даних""" """Must be called before read()"""
if self._started:
return
if not Path(self.accelerometer_filename).exists():
raise FileNotFoundError(f"Accelerometer file not found: {self.accelerometer_filename}")
if not Path(self.gps_filename).exists():
raise FileNotFoundError(f"GPS file not found: {self.gps_filename}")
self._open_files()
self._started = True
def stopReading(self, *args, **kwargs): def stopReading(self, *args, **kwargs):
"""Метод повинен викликатись для закінчення читання даних""" """Must be called when finishing reading"""
self._close_files()
self._started = False
def read(self) -> AggregatedData:
"""Return one combined sample (acc + gps)."""
if not self._started:
raise RuntimeError("Datasource is not started. Call startReading() before read().")
acc_row = self._next_acc_row()
gps_row = self._next_gps_row()
acc = self._parse_acc(acc_row)
gps = self._parse_gps(gps_row)
# IMPORTANT: timing belongs to datasource (not MQTT / main.py)
if config.DELAY and config.DELAY > 0:
time.sleep(float(config.DELAY))
return AggregatedData(
accelerometer=acc,
gps=gps,
time=datetime.utcnow(),
user_id=config.USER_ID,
)
# ---------------- internal ----------------
def _open_files(self) -> None:
self._close_files()
self._acc_f = open(self.accelerometer_filename, "r", newline="", encoding="utf-8")
self._gps_f = open(self.gps_filename, "r", newline="", encoding="utf-8")
self._acc_reader = csv.reader(self._acc_f)
self._gps_reader = csv.reader(self._gps_f)
self._acc_buf = None
self._gps_buf = None
self._acc_has_header, self._acc_buf = self._detect_header_and_buffer(
self._acc_reader, expected_cols=3, header_tokens=("x", "y", "z")
)
self._gps_has_header, self._gps_buf = self._detect_header_and_buffer(
self._gps_reader, expected_cols=2, header_tokens=("longitude", "latitude")
)
def _close_files(self) -> None:
for f in (self._acc_f, self._gps_f):
try:
if f is not None:
f.close()
except Exception:
pass
self._acc_f = None
self._gps_f = None
self._acc_reader = None
self._gps_reader = None
self._acc_buf = None
self._gps_buf = None
self._acc_has_header = None
self._gps_has_header = None
def _rewind_acc(self) -> None:
if self._acc_f is None:
raise RuntimeError("Accelerometer file is not open.")
self._acc_f.seek(0)
self._acc_reader = csv.reader(self._acc_f)
self._acc_has_header, self._acc_buf = self._detect_header_and_buffer(
self._acc_reader, expected_cols=3, header_tokens=("x", "y", "z")
)
def _rewind_gps(self) -> None:
if self._gps_f is None:
raise RuntimeError("GPS file is not open.")
self._gps_f.seek(0)
self._gps_reader = csv.reader(self._gps_f)
self._gps_has_header, self._gps_buf = self._detect_header_and_buffer(
self._gps_reader, expected_cols=2, header_tokens=("longitude", "latitude")
)
def _next_acc_row(self) -> List[str]:
if self._acc_reader is None:
raise RuntimeError("Accelerometer reader is not initialized.")
while True:
if self._acc_buf is not None:
row = self._acc_buf
self._acc_buf = None
else:
row = next(self._acc_reader, None)
if row is None:
# EOF -> rewind & continue
self._rewind_acc()
continue
row = [c.strip() for c in row]
if not row or not any(row):
continue
return row
def _next_gps_row(self) -> List[str]:
if self._gps_reader is None:
raise RuntimeError("GPS reader is not initialized.")
while True:
if self._gps_buf is not None:
row = self._gps_buf
self._gps_buf = None
else:
row = next(self._gps_reader, None)
if row is None:
# EOF -> rewind & continue
self._rewind_gps()
continue
row = [c.strip() for c in row]
if not row or not any(row):
continue
return row
@staticmethod
def _detect_header_and_buffer(
rdr: csv.reader, expected_cols: int, header_tokens: tuple[str, ...]
) -> tuple[bool, Optional[List[str]]]:
first = None
while True:
first = next(rdr, None)
if first is None:
return False, None
first = [c.strip() for c in first]
if first and any(first):
break
norm = [c.lower() for c in first]
# Header if it contains the expected tokens
if all(tok in norm for tok in header_tokens):
return True, None
# If first row is numeric-like and has enough columns => it's data (buffer it back)
if len(norm) >= expected_cols and all(FileDatasource._is_number(x) for x in norm[:expected_cols]):
return False, first
# Otherwise treat it as header-ish (skip it)
return True, None
@staticmethod
def _parse_acc(row: List[str]) -> Accelerometer:
if len(row) < 3:
raise ValueError(f"Accelerometer row must have 3 values (x,y,z). Got: {row}")
x = int(float(row[0]))
y = int(float(row[1]))
z = int(float(row[2]))
return Accelerometer(x=x, y=y, z=z)
@staticmethod
def _parse_gps(row: List[str]) -> Gps:
if len(row) < 2:
raise ValueError(f"GPS row must have 2 values (longitude,latitude). Got: {row}")
lon = float(row[0])
lat = float(row[1])
return Gps(longitude=lon, latitude=lat)
@staticmethod
def _is_number(s: str) -> bool:
try:
float(s)
return True
except Exception:
return False

View File

@ -1,6 +1,4 @@
from paho.mqtt import client as mqtt_client from paho.mqtt import client as mqtt_client
import json
import time
from schema.aggregated_data_schema import AggregatedDataSchema from schema.aggregated_data_schema import AggregatedDataSchema
from file_datasource import FileDatasource from file_datasource import FileDatasource
import config import config
@ -24,29 +22,32 @@ def connect_mqtt(broker, port):
return client return client
def publish(client, topic, datasource, delay): def publish(client, topic, datasource, max_sends = None):
datasource.startReading() datasource.startReading()
i = 0
while True: while True:
time.sleep(delay) i += 1
data = datasource.read() data = datasource.read()
msg = AggregatedDataSchema().dumps(data) msg = AggregatedDataSchema().dumps(data)
result = client.publish(topic, msg) result = client.publish(topic, msg)
# result: [0, 1]
status = result[0] status = result[0]
if status == 0: if status != 0:
pass
# print(f"Send `{msg}` to topic `{topic}`")
else:
print(f"Failed to send message to topic {topic}") print(f"Failed to send message to topic {topic}")
if max_sends and i >= max_sends:
# display test success
exit(0)
def run(): def run():
# Prepare mqtt client # Prepare mqtt client
client = connect_mqtt(config.MQTT_BROKER_HOST, config.MQTT_BROKER_PORT) client = connect_mqtt(config.MQTT_BROKER_HOST, config.MQTT_BROKER_PORT)
# Prepare datasource # Prepare datasource
datasource = FileDatasource("data/data.csv", "data/gps_data.csv") datasource = FileDatasource("data/accelerometer.csv", "data/gps.csv")
# Infinity publish data # Infinity publish data
publish(client, config.MQTT_TOPIC, datasource, config.DELAY) publish(client, config.MQTT_TOPIC, datasource, getattr(config, "MAX_SENDS", None))
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -13,6 +13,7 @@ services:
- 19001:9001 - 19001:9001
networks: networks:
mqtt_network: mqtt_network:
user: 1000:1000
edge: edge: