Compare commits
2 Commits
dev
...
lab1_guran
| Author | SHA1 | Date | |
|---|---|---|---|
| 3120e0054c | |||
| f5927d7157 |
25
.gitea/workflows/testing.yaml
Normal file
25
.gitea/workflows/testing.yaml
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
name: Test Agent
|
||||||
|
on: [push]
|
||||||
|
jobs:
|
||||||
|
test-agent-run:
|
||||||
|
runs-on: debian-x86_64
|
||||||
|
steps:
|
||||||
|
- name: Fetch the repository
|
||||||
|
run: git clone --branch ${{ gitea.ref_name }} --depth 1 ${{ gitea.server_url }}/${{ gitea.repository }}
|
||||||
|
|
||||||
|
- name: Build containers
|
||||||
|
run: docker-compose -f docker-compose-test.yaml build
|
||||||
|
working-directory: sem8-iot-test/agent/docker
|
||||||
|
|
||||||
|
- name: Start MQTT broker
|
||||||
|
run: docker-compose -f docker-compose-test.yaml up -d mqtt
|
||||||
|
working-directory: sem8-iot-test/agent/docker
|
||||||
|
|
||||||
|
- name: Start agent
|
||||||
|
run: docker-compose -f docker-compose-test.yaml run fake_agent
|
||||||
|
working-directory: sem8-iot-test/agent/docker
|
||||||
|
|
||||||
|
- name: Clean up
|
||||||
|
if: always()
|
||||||
|
run: docker-compose -f docker-compose-test.yaml down
|
||||||
|
working-directory: sem8-iot-test/agent/docker
|
||||||
34
agent/docker/docker-compose-test.yaml
Normal file
34
agent/docker/docker-compose-test.yaml
Normal file
@ -0,0 +1,34 @@
|
|||||||
|
version: "3.3"
|
||||||
|
#name: "road_vision"
|
||||||
|
services:
|
||||||
|
mqtt:
|
||||||
|
image: eclipse-mosquitto
|
||||||
|
container_name: mqtt
|
||||||
|
volumes:
|
||||||
|
- ./mosquitto:/mosquitto
|
||||||
|
- ./mosquitto/data:/mosquitto/data
|
||||||
|
- ./mosquitto/log:/mosquitto/log
|
||||||
|
ports:
|
||||||
|
- 1883:1883
|
||||||
|
- 9001:9001
|
||||||
|
networks:
|
||||||
|
mqtt_network:
|
||||||
|
|
||||||
|
|
||||||
|
fake_agent:
|
||||||
|
container_name: agent
|
||||||
|
build: ../
|
||||||
|
depends_on:
|
||||||
|
- mqtt
|
||||||
|
environment:
|
||||||
|
MQTT_BROKER_HOST: "mqtt"
|
||||||
|
MQTT_BROKER_PORT: 1883
|
||||||
|
MQTT_TOPIC: "agent_data_topic"
|
||||||
|
DELAY: 0.1
|
||||||
|
MAX_SENDS: 30
|
||||||
|
networks:
|
||||||
|
mqtt_network:
|
||||||
|
|
||||||
|
|
||||||
|
networks:
|
||||||
|
mqtt_network:
|
||||||
@ -1,4 +1,5 @@
|
|||||||
name: "road_vision"
|
version: "3.3"
|
||||||
|
#name: "road_vision"
|
||||||
services:
|
services:
|
||||||
mqtt:
|
mqtt:
|
||||||
image: eclipse-mosquitto
|
image: eclipse-mosquitto
|
||||||
|
|||||||
@ -16,3 +16,6 @@ MQTT_TOPIC = os.environ.get("MQTT_TOPIC") or "agent"
|
|||||||
|
|
||||||
# Delay for sending data to mqtt in seconds
|
# Delay for sending data to mqtt in seconds
|
||||||
DELAY = try_parse(float, os.environ.get("DELAY")) or 1
|
DELAY = try_parse(float, os.environ.get("DELAY")) or 1
|
||||||
|
|
||||||
|
# Testing switches for CI/CD
|
||||||
|
MAX_SENDS = try_parse(int, os.environ.get("MAX_SENDS"))
|
||||||
|
|||||||
@ -1,5 +1,11 @@
|
|||||||
from csv import reader
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import csv
|
||||||
|
import time
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Optional, List
|
||||||
|
|
||||||
from domain.accelerometer import Accelerometer
|
from domain.accelerometer import Accelerometer
|
||||||
from domain.gps import Gps
|
from domain.gps import Gps
|
||||||
from domain.aggregated_data import AggregatedData
|
from domain.aggregated_data import AggregatedData
|
||||||
@ -7,24 +13,213 @@ import config
|
|||||||
|
|
||||||
|
|
||||||
class FileDatasource:
|
class FileDatasource:
|
||||||
def __init__(
|
|
||||||
self,
|
|
||||||
accelerometer_filename: str,
|
|
||||||
gps_filename: str,
|
|
||||||
) -> None:
|
|
||||||
pass
|
|
||||||
|
|
||||||
def read(self) -> AggregatedData:
|
def __init__(self, accelerometer_filename: str, gps_filename: str) -> None:
|
||||||
"""Метод повертає дані отримані з датчиків"""
|
self.accelerometer_filename = accelerometer_filename
|
||||||
return AggregatedData(
|
self.gps_filename = gps_filename
|
||||||
Accelerometer(1, 2, 3),
|
|
||||||
Gps(4, 5),
|
self._acc_f = None
|
||||||
datetime.now(),
|
self._gps_f = None
|
||||||
config.USER_ID,
|
self._acc_reader: Optional[csv.reader] = None
|
||||||
)
|
self._gps_reader: Optional[csv.reader] = None
|
||||||
|
|
||||||
|
self._started = False
|
||||||
|
|
||||||
|
# one-row buffers (supports CSVs with or without header)
|
||||||
|
self._acc_buf: Optional[List[str]] = None
|
||||||
|
self._gps_buf: Optional[List[str]] = None
|
||||||
|
|
||||||
|
self._acc_has_header: Optional[bool] = None
|
||||||
|
self._gps_has_header: Optional[bool] = None
|
||||||
|
|
||||||
def startReading(self, *args, **kwargs):
|
def startReading(self, *args, **kwargs):
|
||||||
"""Метод повинен викликатись перед початком читання даних"""
|
"""Must be called before read()"""
|
||||||
|
if self._started:
|
||||||
|
return
|
||||||
|
|
||||||
|
if not Path(self.accelerometer_filename).exists():
|
||||||
|
raise FileNotFoundError(f"Accelerometer file not found: {self.accelerometer_filename}")
|
||||||
|
if not Path(self.gps_filename).exists():
|
||||||
|
raise FileNotFoundError(f"GPS file not found: {self.gps_filename}")
|
||||||
|
|
||||||
|
self._open_files()
|
||||||
|
self._started = True
|
||||||
|
|
||||||
def stopReading(self, *args, **kwargs):
|
def stopReading(self, *args, **kwargs):
|
||||||
"""Метод повинен викликатись для закінчення читання даних"""
|
"""Must be called when finishing reading"""
|
||||||
|
self._close_files()
|
||||||
|
self._started = False
|
||||||
|
|
||||||
|
def read(self) -> AggregatedData:
|
||||||
|
"""Return one combined sample (acc + gps)."""
|
||||||
|
if not self._started:
|
||||||
|
raise RuntimeError("Datasource is not started. Call startReading() before read().")
|
||||||
|
|
||||||
|
acc_row = self._next_acc_row()
|
||||||
|
gps_row = self._next_gps_row()
|
||||||
|
|
||||||
|
acc = self._parse_acc(acc_row)
|
||||||
|
gps = self._parse_gps(gps_row)
|
||||||
|
|
||||||
|
# IMPORTANT: timing belongs to datasource (not MQTT / main.py)
|
||||||
|
if config.DELAY and config.DELAY > 0:
|
||||||
|
time.sleep(float(config.DELAY))
|
||||||
|
|
||||||
|
return AggregatedData(
|
||||||
|
accelerometer=acc,
|
||||||
|
gps=gps,
|
||||||
|
time=datetime.utcnow(),
|
||||||
|
user_id=config.USER_ID,
|
||||||
|
)
|
||||||
|
|
||||||
|
# ---------------- internal ----------------
|
||||||
|
|
||||||
|
def _open_files(self) -> None:
|
||||||
|
self._close_files()
|
||||||
|
|
||||||
|
self._acc_f = open(self.accelerometer_filename, "r", newline="", encoding="utf-8")
|
||||||
|
self._gps_f = open(self.gps_filename, "r", newline="", encoding="utf-8")
|
||||||
|
|
||||||
|
self._acc_reader = csv.reader(self._acc_f)
|
||||||
|
self._gps_reader = csv.reader(self._gps_f)
|
||||||
|
|
||||||
|
self._acc_buf = None
|
||||||
|
self._gps_buf = None
|
||||||
|
|
||||||
|
self._acc_has_header, self._acc_buf = self._detect_header_and_buffer(
|
||||||
|
self._acc_reader, expected_cols=3, header_tokens=("x", "y", "z")
|
||||||
|
)
|
||||||
|
self._gps_has_header, self._gps_buf = self._detect_header_and_buffer(
|
||||||
|
self._gps_reader, expected_cols=2, header_tokens=("longitude", "latitude")
|
||||||
|
)
|
||||||
|
|
||||||
|
def _close_files(self) -> None:
|
||||||
|
for f in (self._acc_f, self._gps_f):
|
||||||
|
try:
|
||||||
|
if f is not None:
|
||||||
|
f.close()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
self._acc_f = None
|
||||||
|
self._gps_f = None
|
||||||
|
self._acc_reader = None
|
||||||
|
self._gps_reader = None
|
||||||
|
self._acc_buf = None
|
||||||
|
self._gps_buf = None
|
||||||
|
self._acc_has_header = None
|
||||||
|
self._gps_has_header = None
|
||||||
|
|
||||||
|
def _rewind_acc(self) -> None:
|
||||||
|
if self._acc_f is None:
|
||||||
|
raise RuntimeError("Accelerometer file is not open.")
|
||||||
|
self._acc_f.seek(0)
|
||||||
|
self._acc_reader = csv.reader(self._acc_f)
|
||||||
|
self._acc_has_header, self._acc_buf = self._detect_header_and_buffer(
|
||||||
|
self._acc_reader, expected_cols=3, header_tokens=("x", "y", "z")
|
||||||
|
)
|
||||||
|
|
||||||
|
def _rewind_gps(self) -> None:
|
||||||
|
if self._gps_f is None:
|
||||||
|
raise RuntimeError("GPS file is not open.")
|
||||||
|
self._gps_f.seek(0)
|
||||||
|
self._gps_reader = csv.reader(self._gps_f)
|
||||||
|
self._gps_has_header, self._gps_buf = self._detect_header_and_buffer(
|
||||||
|
self._gps_reader, expected_cols=2, header_tokens=("longitude", "latitude")
|
||||||
|
)
|
||||||
|
|
||||||
|
def _next_acc_row(self) -> List[str]:
|
||||||
|
if self._acc_reader is None:
|
||||||
|
raise RuntimeError("Accelerometer reader is not initialized.")
|
||||||
|
|
||||||
|
while True:
|
||||||
|
if self._acc_buf is not None:
|
||||||
|
row = self._acc_buf
|
||||||
|
self._acc_buf = None
|
||||||
|
else:
|
||||||
|
row = next(self._acc_reader, None)
|
||||||
|
|
||||||
|
if row is None:
|
||||||
|
# EOF -> rewind & continue
|
||||||
|
self._rewind_acc()
|
||||||
|
continue
|
||||||
|
|
||||||
|
row = [c.strip() for c in row]
|
||||||
|
if not row or not any(row):
|
||||||
|
continue
|
||||||
|
|
||||||
|
return row
|
||||||
|
|
||||||
|
def _next_gps_row(self) -> List[str]:
|
||||||
|
if self._gps_reader is None:
|
||||||
|
raise RuntimeError("GPS reader is not initialized.")
|
||||||
|
|
||||||
|
while True:
|
||||||
|
if self._gps_buf is not None:
|
||||||
|
row = self._gps_buf
|
||||||
|
self._gps_buf = None
|
||||||
|
else:
|
||||||
|
row = next(self._gps_reader, None)
|
||||||
|
|
||||||
|
if row is None:
|
||||||
|
# EOF -> rewind & continue
|
||||||
|
self._rewind_gps()
|
||||||
|
continue
|
||||||
|
|
||||||
|
row = [c.strip() for c in row]
|
||||||
|
if not row or not any(row):
|
||||||
|
continue
|
||||||
|
|
||||||
|
return row
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _detect_header_and_buffer(
|
||||||
|
rdr: csv.reader, expected_cols: int, header_tokens: tuple[str, ...]
|
||||||
|
) -> tuple[bool, Optional[List[str]]]:
|
||||||
|
|
||||||
|
first = None
|
||||||
|
while True:
|
||||||
|
first = next(rdr, None)
|
||||||
|
if first is None:
|
||||||
|
return False, None
|
||||||
|
first = [c.strip() for c in first]
|
||||||
|
if first and any(first):
|
||||||
|
break
|
||||||
|
|
||||||
|
norm = [c.lower() for c in first]
|
||||||
|
|
||||||
|
# Header if it contains the expected tokens
|
||||||
|
if all(tok in norm for tok in header_tokens):
|
||||||
|
return True, None
|
||||||
|
|
||||||
|
# If first row is numeric-like and has enough columns => it's data (buffer it back)
|
||||||
|
if len(norm) >= expected_cols and all(FileDatasource._is_number(x) for x in norm[:expected_cols]):
|
||||||
|
return False, first
|
||||||
|
|
||||||
|
# Otherwise treat it as header-ish (skip it)
|
||||||
|
return True, None
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _parse_acc(row: List[str]) -> Accelerometer:
|
||||||
|
if len(row) < 3:
|
||||||
|
raise ValueError(f"Accelerometer row must have 3 values (x,y,z). Got: {row}")
|
||||||
|
x = int(float(row[0]))
|
||||||
|
y = int(float(row[1]))
|
||||||
|
z = int(float(row[2]))
|
||||||
|
return Accelerometer(x=x, y=y, z=z)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _parse_gps(row: List[str]) -> Gps:
|
||||||
|
if len(row) < 2:
|
||||||
|
raise ValueError(f"GPS row must have 2 values (longitude,latitude). Got: {row}")
|
||||||
|
lon = float(row[0])
|
||||||
|
lat = float(row[1])
|
||||||
|
return Gps(longitude=lon, latitude=lat)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _is_number(s: str) -> bool:
|
||||||
|
try:
|
||||||
|
float(s)
|
||||||
|
return True
|
||||||
|
except Exception:
|
||||||
|
return False
|
||||||
@ -1,6 +1,4 @@
|
|||||||
from paho.mqtt import client as mqtt_client
|
from paho.mqtt import client as mqtt_client
|
||||||
import json
|
|
||||||
import time
|
|
||||||
from schema.aggregated_data_schema import AggregatedDataSchema
|
from schema.aggregated_data_schema import AggregatedDataSchema
|
||||||
from file_datasource import FileDatasource
|
from file_datasource import FileDatasource
|
||||||
import config
|
import config
|
||||||
@ -24,29 +22,32 @@ def connect_mqtt(broker, port):
|
|||||||
return client
|
return client
|
||||||
|
|
||||||
|
|
||||||
def publish(client, topic, datasource, delay):
|
def publish(client, topic, datasource, max_sends = None):
|
||||||
datasource.startReading()
|
datasource.startReading()
|
||||||
|
|
||||||
|
i = 0
|
||||||
while True:
|
while True:
|
||||||
time.sleep(delay)
|
i += 1
|
||||||
|
|
||||||
data = datasource.read()
|
data = datasource.read()
|
||||||
msg = AggregatedDataSchema().dumps(data)
|
msg = AggregatedDataSchema().dumps(data)
|
||||||
result = client.publish(topic, msg)
|
result = client.publish(topic, msg)
|
||||||
# result: [0, 1]
|
|
||||||
status = result[0]
|
status = result[0]
|
||||||
if status == 0:
|
if status != 0:
|
||||||
pass
|
|
||||||
# print(f"Send `{msg}` to topic `{topic}`")
|
|
||||||
else:
|
|
||||||
print(f"Failed to send message to topic {topic}")
|
print(f"Failed to send message to topic {topic}")
|
||||||
|
|
||||||
|
if max_sends and i >= max_sends:
|
||||||
|
# display test success
|
||||||
|
exit(0)
|
||||||
|
|
||||||
|
|
||||||
def run():
|
def run():
|
||||||
# Prepare mqtt client
|
# Prepare mqtt client
|
||||||
client = connect_mqtt(config.MQTT_BROKER_HOST, config.MQTT_BROKER_PORT)
|
client = connect_mqtt(config.MQTT_BROKER_HOST, config.MQTT_BROKER_PORT)
|
||||||
# Prepare datasource
|
# Prepare datasource
|
||||||
datasource = FileDatasource("data/data.csv", "data/gps_data.csv")
|
datasource = FileDatasource("data/accelerometer.csv", "data/gps.csv")
|
||||||
# Infinity publish data
|
# Infinity publish data
|
||||||
publish(client, config.MQTT_TOPIC, datasource, config.DELAY)
|
publish(client, config.MQTT_TOPIC, datasource, getattr(config, "MAX_SENDS", None))
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|||||||
@ -13,6 +13,7 @@ services:
|
|||||||
- 19001:9001
|
- 19001:9001
|
||||||
networks:
|
networks:
|
||||||
mqtt_network:
|
mqtt_network:
|
||||||
|
user: 1000:1000
|
||||||
|
|
||||||
|
|
||||||
edge:
|
edge:
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user