8 Commits

8 changed files with 18 additions and 80 deletions
-25
View File
@@ -1,25 +0,0 @@
name: Test Agent
on: [push]
jobs:
test-agent-run:
runs-on: debian-x86_64
steps:
- name: Fetch the repository
run: git clone --branch ${{ gitea.ref_name }} --depth 1 ${{ gitea.server_url }}/${{ gitea.repository }}
- name: Build containers
run: docker-compose -f docker-compose-test.yaml build
working-directory: sem8-iot-test/agent/docker
- name: Start MQTT broker
run: docker-compose -f docker-compose-test.yaml up -d mqtt
working-directory: sem8-iot-test/agent/docker
- name: Start agent
run: docker-compose -f docker-compose-test.yaml run fake_agent
working-directory: sem8-iot-test/agent/docker
- name: Clean up
if: always()
run: docker-compose -f docker-compose-test.yaml down
working-directory: sem8-iot-test/agent/docker
+2
View File
@@ -0,0 +1,2 @@
agent/docker/mosquitto/data/
agent/docker/mosquitto/log/
-34
View File
@@ -1,34 +0,0 @@
version: "3.3"
#name: "road_vision"
services:
mqtt:
image: eclipse-mosquitto
container_name: mqtt
volumes:
- ./mosquitto:/mosquitto
- ./mosquitto/data:/mosquitto/data
- ./mosquitto/log:/mosquitto/log
ports:
- 1883:1883
- 9001:9001
networks:
mqtt_network:
fake_agent:
container_name: agent
build: ../
depends_on:
- mqtt
environment:
MQTT_BROKER_HOST: "mqtt"
MQTT_BROKER_PORT: 1883
MQTT_TOPIC: "agent_data_topic"
DELAY: 0.1
MAX_SENDS: 30
networks:
mqtt_network:
networks:
mqtt_network:
+2 -2
View File
@@ -1,5 +1,5 @@
version: "3.3"
#name: "road_vision"
version: "3.9"
name: "road_vision"
services:
mqtt:
image: eclipse-mosquitto
-3
View File
@@ -16,6 +16,3 @@ MQTT_TOPIC = os.environ.get("MQTT_TOPIC") or "agent"
# Delay for sending data to mqtt in seconds
DELAY = try_parse(float, os.environ.get("DELAY")) or 1
# Testing switches for CI/CD
MAX_SENDS = try_parse(int, os.environ.get("MAX_SENDS"))
+11 -4
View File
@@ -68,7 +68,7 @@ class FileDatasource:
return AggregatedData(
accelerometer=acc,
gps=gps,
time=datetime.utcnow(),
timestamp=datetime.utcnow(),
user_id=config.USER_ID,
)
@@ -203,9 +203,16 @@ class FileDatasource:
def _parse_acc(row: List[str]) -> Accelerometer:
if len(row) < 3:
raise ValueError(f"Accelerometer row must have 3 values (x,y,z). Got: {row}")
x = int(float(row[0]))
y = int(float(row[1]))
z = int(float(row[2]))
try:
x = int(row[0])
y = int(row[1])
z = int(row[2])
except ValueError as e:
raise ValueError(
f"Invalid accelerometer values (expected integers): {row}"
) from e
return Accelerometer(x=x, y=y, z=z)
@staticmethod
+3 -11
View File
@@ -22,13 +22,9 @@ def connect_mqtt(broker, port):
return client
def publish(client, topic, datasource, max_sends = None):
def publish(client, topic, datasource):
datasource.startReading()
i = 0
while True:
i += 1
data = datasource.read()
msg = AggregatedDataSchema().dumps(data)
result = client.publish(topic, msg)
@@ -36,10 +32,6 @@ def publish(client, topic, datasource, max_sends = None):
if status != 0:
print(f"Failed to send message to topic {topic}")
if max_sends and i >= max_sends:
# display test success
exit(0)
def run():
# Prepare mqtt client
@@ -47,8 +39,8 @@ def run():
# Prepare datasource
datasource = FileDatasource("data/accelerometer.csv", "data/gps.csv")
# Infinity publish data
publish(client, config.MQTT_TOPIC, datasource, getattr(config, "MAX_SENDS", None))
publish(client, config.MQTT_TOPIC, datasource)
if __name__ == "__main__":
run()
run()
-1
View File
@@ -13,7 +13,6 @@ services:
- 19001:9001
networks:
mqtt_network:
user: 1000:1000
edge: