From 092130dfab184e7f738a77b640b6d1fd56440fc9 Mon Sep 17 00:00:00 2001 From: hasslesstech Date: Tue, 24 Feb 2026 12:48:48 +0200 Subject: [PATCH] lab1: implement UART reader alongside with the file writer usage example --- agent/uart-saver/FileWriter.py | 16 ++++++++ agent/uart-saver/UARTReader.py | 68 ++++++++++++++++++++++++++++++++++ 2 files changed, 84 insertions(+) create mode 100644 agent/uart-saver/FileWriter.py create mode 100644 agent/uart-saver/UARTReader.py diff --git a/agent/uart-saver/FileWriter.py b/agent/uart-saver/FileWriter.py new file mode 100644 index 0000000..8dccb61 --- /dev/null +++ b/agent/uart-saver/FileWriter.py @@ -0,0 +1,16 @@ +from UARTReader import UARTReader +import os + +PORT = os.environ.get("PORT", None) or "/dev/ttyUSB0" +FILE = os.environ.get("FILE", None) or "savefile.csv" + +r = UARTReader(PORT) + +with open(FILE, "w") as f: + f.write("x,y,z\n") + + for i in range(500): + v = r.read_next() + f.write(",".join(tuple(map(str, v))) + "\n") + +r.close() diff --git a/agent/uart-saver/UARTReader.py b/agent/uart-saver/UARTReader.py new file mode 100644 index 0000000..ad9f48c --- /dev/null +++ b/agent/uart-saver/UARTReader.py @@ -0,0 +1,68 @@ +import os + +SUDO = os.environ.get("SUDO", None) or "sudo" + +class UARTReader: + def __init__(self, port = None): + if port == None: + # testing mode + pass + else: + # configure TTY + os.system(f"{SUDO} stty -F {port} 1:0:8bf:0:3:1c:7f:15:4:5:1:0:11:13:1a:0:12:f:17:16:0:0:0:0:0:0:0:0:0:0:0:0:0:0:0:0") + self.port = open(port, "rb") + + self.i = 0 + self.d = 0 + self.state = 0 + self.data = bytearray(6) + + def close(self): + self.port.close() + + def __to_int(self, x): + if x >= 0x8000: + return x - 0x10000 + else: + return x + + def __frame_seek(self, byte): + if byte == 0xFF: + self.i += 1 + + if self.i >= 10 and byte != 0xFF: + self.i = 0 + + self.data[self.d] = byte + self.d = 1 + + self.state = 1 + + def __data_read(self, byte): + self.data[self.d] = byte + + self.d += 1 + + if self.d >= 6: + self.d = 0 + self.state = 2 + + def read_next(self): + while True: + b = self.port.read(1) + + if not b: + self.close() + return None + + if self.state == 0: + self.__frame_seek(b[0]) + elif self.state == 1: + self.__data_read(b[0]) + + if self.state == 2: + self.state = 0 + + return (self.__to_int(self.data[0] + (self.data[1] << 8)), + self.__to_int(self.data[2] + (self.data[3] << 8)), + self.__to_int(self.data[4] + (self.data[5] << 8)))