From 0097e71e2994899477b57810b3ad8f28faa78aa5 Mon Sep 17 00:00:00 2001 From: dymik739 Date: Fri, 25 Aug 2023 12:28:39 +0300 Subject: [PATCH] improve backward compatibility and fix minor bugs --- src/server.py | 469 +++++++++++++++++++++++++------------------------- 1 file changed, 235 insertions(+), 234 deletions(-) diff --git a/src/server.py b/src/server.py index aaeb1c3..b182c29 100644 --- a/src/server.py +++ b/src/server.py @@ -148,7 +148,9 @@ file_size = os.stat(fn).st_size print(f"[init:video] File size: {file_size} bytes (~{round(file_size/1024/1024, 2)} MB)") print("[init:video] Computing checksum... (might take some time)") -file_hash = hashlib.file_digest(open(fn, 'rb'), 'sha256').hexdigest() +with open(fn, 'rb') as f: + file_hash = hashlib.sha256(f.read()).hexdigest() +#file_hash = hashlib.file_digest(open(fn, 'rb'), 'sha256').hexdigest() print(f"[init:video] Checksum for the file: {file_hash}") total_chunk_amount = math.ceil(duration / chunk_length) @@ -222,291 +224,290 @@ def connection_processor(conn, addr): print(f"[{addr[0]}:{addr[1]} - parser] Received command: {' '.join(header)}") - match header[0]: - case "DEBUG": - with lock: - conn.send(f"DEBUG\nCHUNKS {str(chunk_queue)}\n".encode("UTF-8")) - case "HASH": - conn.send(f"HASH {file_hash}\n".encode("UTF-8")) - case "SIZE": - conn.send(f"SIZE {file_size}\n".encode("UTF-8")) - case "URL": - conn.send(f"URL {video_url}\n".encode("UTF-8")) - case "SCRIPT": - conn.send(f"SCRIPT {script_url}\n".encode("UTF-8")) - case "NEXT": - with lock: - next_chunk = chunk_queue.next_wanted() - if next_chunk: - new_worker = Worker(next_worker_id, addr) - next_worker_id += 1 + if header[0] == "DEBUG": + with lock: + conn.send(f"DEBUG\nCHUNKS {str(chunk_queue)}\n".encode("UTF-8")) + elif header[0] == "HASH": + conn.send(f"HASH {file_hash}\n".encode("UTF-8")) + elif header[0] == "SIZE": + conn.send(f"SIZE {file_size}\n".encode("UTF-8")) + elif header[0] == "URL": + conn.send(f"URL {video_url}\n".encode("UTF-8")) + elif header[0] == "SCRIPT": + conn.send(f"SCRIPT {script_url}\n".encode("UTF-8")) + elif header[0] == "NEXT": + with lock: + next_chunk = chunk_queue.next_wanted() + if next_chunk: + new_worker = Worker(next_worker_id, addr) + next_worker_id += 1 - next_chunk.add_worker(new_worker) + next_chunk.add_worker(new_worker) - conn.send(f"CHUNK {file_hash} {next_chunk.seq_id} {next_chunk.start} {next_chunk.length} {RECOMMENDED_PING_INTERVAL} {new_worker.id}\n".encode("UTF-8")) + conn.send(f"CHUNK {file_hash} {next_chunk.seq_id} {next_chunk.start} {next_chunk.length} {RECOMMENDED_PING_INTERVAL} {new_worker.id}\n".encode("UTF-8")) + else: + if chunk_queue.processing_finished(): + conn.send("FINISH\n".encode("UTF-8")) else: - if chunk_queue.processing_finished(): - conn.send("FINISH\n".encode("UTF-8")) - else: - conn.send(f"WAIT {CHUNK_CHECK_INTERVAL}") - case "PING": - if file_hash != header[1]: - print(f"[{addr[0]}:{addr[1]} - PING] Hash mismatch: expected {file_hash}, got {header[1]}") - conn.send("RESET\n".encode("UTF-8")) - conn.close() - connection_alive = False - break - - if not header[2].isdigit(): - print(f"[{addr[0]}:{addr[1]} - PING] invalid seq_id: not an int (got {header[2]} instead)") + conn.send(f"WAIT {CHUNK_CHECK_INTERVAL}\n".encode("UTF-8")) + elif header[0] == "PING": + if file_hash != header[1]: + print(f"[{addr[0]}:{addr[1]} - PING] Hash mismatch: expected {file_hash}, got {header[1]}") + conn.send("RESET\n".encode("UTF-8")) + conn.close() + connection_alive = False + break + + if not header[2].isdigit(): + print(f"[{addr[0]}:{addr[1]} - PING] invalid seq_id: not an int (got {header[2]} instead)") + conn.send("RESET\n".encode("UTF-8")) + conn.close() + connection_alive = False + break + + if not header[3].isdigit(): + print(f"[{addr[0]}:{addr[1]} - PING] invalid worker_id: not an int (got {header[3]} instead)") + conn.send("RESET\n".encode("UTF-8")) + conn.close() + connection_alive = False + break + + with lock: + chunk = chunk_queue.get_chunk_by_id(int(header[2])) + + if not chunk: + print(f"[{addr[0]}:{addr[1]} - PING] invalid seq_id: no chunk with id {header[2]}") conn.send("RESET\n".encode("UTF-8")) conn.close() connection_alive = False break - if not header[3].isdigit(): - print(f"[{addr[0]}:{addr[1]} - PING] invalid worker_id: not an int (got {header[3]} instead)") + worker = chunk.get_worker_by_id(int(header[3])) + + if not worker: + print(f"[{addr[0]}:{addr[1]} - PING] invalid worker_id: no worker with id {header[3]}") conn.send("RESET\n".encode("UTF-8")) conn.close() connection_alive = False break - with lock: - chunk = chunk_queue.get_chunk_by_id(int(header[2])) + if worker.endpoint != addr: + print(f"[{addr[0]}:{addr[1]} - PING] Warning: worker {worker.id} for chunk {chunk.seq_id} changed it's endpoint ({worker.endpoint[0]}:{worker.endpoint[1]} -> {addr[0]}:{addr[1]})") + worker.endpoint = addr - if not chunk: - print(f"[{addr[0]}:{addr[1]} - PING] invalid seq_id: no chunk with id {header[2]}") - conn.send("RESET\n".encode("UTF-8")) - conn.close() - connection_alive = False - break + worker.update_timestamp() - worker = chunk.get_worker_by_id(int(header[3])) + if chunk.done: + status = "done" + else: + status = "waiting" - if not worker: - print(f"[{addr[0]}:{addr[1]} - PING] invalid worker_id: no worker with id {header[3]}") - conn.send("RESET\n".encode("UTF-8")) - conn.close() - connection_alive = False - break + conn.send(f"PONG {status}\n".encode("UTF-8")) + elif header[0] == "UPLOAD": + if file_hash != header[1]: + print(f"[{addr[0]}:{addr[1]} - UPLOAD] Hash mismatch: expected {file_hash}, got {header[1]}") + conn.send("RESET\n".encode("UTF-8")) + conn.close() + connection_alive = False + break + + if not header[2].isdigit(): + print(f"[{addr[0]}:{addr[1]} - UPLOAD] invalid seq_id: not an int (got {header[2]} instead)") + conn.send("RESET\n".encode("UTF-8")) + conn.close() + connection_alive = False + break - if worker.endpoint != addr: - print(f"[{addr[0]}:{addr[1]} - PING] Warning: worker {worker.id} for chunk {chunk.seq_id} changed it's endpoint ({worker.endpoint[0]}:{worker.endpoint[1]} -> {addr[0]}:{addr[1]})") - worker.endpoint = addr + if not header[3].isdigit(): + print(f"[{addr[0]}:{addr[1]} - UPLOAD] invalid worker_id: not an int (got {header[3]} instead)") + conn.send("RESET\n".encode("UTF-8")) + conn.close() + connection_alive = False + break - worker.update_timestamp() + if not header[4].isdigit(): + print(f"[{addr[0]}:{addr[1]} - UPLOAD] invalid chunk_size: not an int (got {header[3]} instead)") + conn.send("RESET\n".encode("UTF-8")) + conn.close() + connection_alive = False + break - if chunk.done: - status = "done" - else: - status = "waiting" + with lock: + chunk = chunk_queue.get_chunk_by_id(int(header[2])) - conn.send(f"PONG {status}\n".encode("UTF-8")) - case "UPLOAD": - if file_hash != header[1]: - print(f"[{addr[0]}:{addr[1]} - UPLOAD] Hash mismatch: expected {file_hash}, got {header[1]}") - conn.send("RESET\n".encode("UTF-8")) - conn.close() - connection_alive = False - break - - if not header[2].isdigit(): - print(f"[{addr[0]}:{addr[1]} - UPLOAD] invalid seq_id: not an int (got {header[2]} instead)") + if not chunk: + print(f"[{addr[0]}:{addr[1]} - UPLOAD] invalid seq_id: no chunk with id {header[2]}") conn.send("RESET\n".encode("UTF-8")) conn.close() connection_alive = False break - if not header[3].isdigit(): - print(f"[{addr[0]}:{addr[1]} - UPLOAD] invalid worker_id: not an int (got {header[3]} instead)") + worker = chunk.get_worker_by_id(int(header[3])) + + if not worker: + print(f"[{addr[0]}:{addr[1]} - UPLOAD] invalid worker_id: no worker with id {header[3]}") conn.send("RESET\n".encode("UTF-8")) conn.close() connection_alive = False break - if not header[4].isdigit(): - print(f"[{addr[0]}:{addr[1]} - UPLOAD] invalid chunk_size: not an int (got {header[3]} instead)") + if worker.endpoint != addr: + print(f"[{addr[0]}:{addr[1]} - UPLOAD] Warning: worker {worker.id} for chunk {chunk.seq_id} changed it's endpoint ({worker.endpoint[0]}:{worker.endpoint[1]} -> {addr[0]}:{addr[1]})") + worker.endpoint = addr + + worker.update_timestamp() + + conn.send(f"ACCEPT\n".encode("UTF-8")) + + remaining_data = int(header[4]) + conn.settimeout(CONNECTION_TIMEOUT_DURING_LOCK) + with open(f"chunks/{header[2]}-{header[3]}.mkv", "wb") as f: + while remaining_data: + new_data = conn.recv(remaining_data) + + if len(new_data) == 0: + connection_alive = False + conn.close() + break + + f.write(new_data) + remaining_data -= len(new_data) + + conn.settimeout(SERVER_SOCKET_TIMEOUT) + conn.send("ACCEPTED\n".encode("UTF-8")) + + open("ready.txt", "a").write(f"{header[2]}\n") + chunk.done = True + elif header[0] == "REUPLOAD": + if file_hash != header[1]: + print(f"[{addr[0]}:{addr[1]} - UPLOAD] Hash mismatch: expected {file_hash}, got {header[1]}") + conn.send("RESET\n".encode("UTF-8")) + conn.close() + connection_alive = False + break + + if not header[2].isdigit(): + print(f"[{addr[0]}:{addr[1]} - UPLOAD] invalid seq_id: not an int (got {header[2]} instead)") + conn.send("RESET\n".encode("UTF-8")) + conn.close() + connection_alive = False + break + + if not header[3].isdigit(): + print(f"[{addr[0]}:{addr[1]} - UPLOAD] invalid worker_id: not an int (got {header[3]} instead)") + conn.send("RESET\n".encode("UTF-8")) + conn.close() + connection_alive = False + break + + if not header[4].isdigit(): + print(f"[{addr[0]}:{addr[1]} - UPLOAD] invalid chunk_size: not an int (got {header[3]} instead)") + conn.send("RESET\n".encode("UTF-8")) + conn.close() + connection_alive = False + break + + with lock: + chunk = chunk_queue.get_chunk_by_id(int(header[2])) + + if not chunk: + print(f"[{addr[0]}:{addr[1]} - UPLOAD] invalid seq_id: no chunk with id {header[2]}") conn.send("RESET\n".encode("UTF-8")) conn.close() connection_alive = False break - with lock: - chunk = chunk_queue.get_chunk_by_id(int(header[2])) + worker = chunk.get_worker_by_id(int(header[3])) - if not chunk: - print(f"[{addr[0]}:{addr[1]} - UPLOAD] invalid seq_id: no chunk with id {header[2]}") - conn.send("RESET\n".encode("UTF-8")) - conn.close() - connection_alive = False - break - - worker = chunk.get_worker_by_id(int(header[3])) - - if not worker: - print(f"[{addr[0]}:{addr[1]} - UPLOAD] invalid worker_id: no worker with id {header[3]}") - conn.send("RESET\n".encode("UTF-8")) - conn.close() - connection_alive = False - break - - if worker.endpoint != addr: - print(f"[{addr[0]}:{addr[1]} - UPLOAD] Warning: worker {worker.id} for chunk {chunk.seq_id} changed it's endpoint ({worker.endpoint[0]}:{worker.endpoint[1]} -> {addr[0]}:{addr[1]})") - worker.endpoint = addr - - worker.update_timestamp() - - conn.send(f"ACCEPT\n".encode("UTF-8")) - - remaining_data = int(header[4]) - conn.settimeout(CONNECTION_TIMEOUT_DURING_LOCK) - with open(f"chunks/{header[2]}-{header[3]}.mkv", "wb") as f: - while remaining_data: - new_data = conn.recv(remaining_data) - - if len(new_data) == 0: - connection_alive = False - conn.close() - break - - f.write(new_data) - remaining_data -= len(new_data) - - conn.settimeout(SERVER_SOCKET_TIMEOUT) - conn.send("ACCEPTED\n".encode("UTF-8")) - - open("ready.txt", "a").write(f"{header[2]}\n") - chunk.done = True - case "REUPLOAD": - if file_hash != header[1]: - print(f"[{addr[0]}:{addr[1]} - UPLOAD] Hash mismatch: expected {file_hash}, got {header[1]}") - conn.send("RESET\n".encode("UTF-8")) - conn.close() - connection_alive = False - break - - if not header[2].isdigit(): - print(f"[{addr[0]}:{addr[1]} - UPLOAD] invalid seq_id: not an int (got {header[2]} instead)") + if not worker: + print(f"[{addr[0]}:{addr[1]} - UPLOAD] invalid worker_id: no worker with id {header[3]}") conn.send("RESET\n".encode("UTF-8")) conn.close() connection_alive = False break - if not header[3].isdigit(): - print(f"[{addr[0]}:{addr[1]} - UPLOAD] invalid worker_id: not an int (got {header[3]} instead)") + if worker.endpoint != addr: + print(f"[{addr[0]}:{addr[1]} - UPLOAD] Warning: worker {worker.id} for chunk {chunk.seq_id} changed it's endpoint ({worker.endpoint[0]}:{worker.endpoint[1]} -> {addr[0]}:{addr[1]})") + worker.endpoint = addr + + worker.update_timestamp() + + if os.path.exists(f"chunks/{header[2]}-{header[3]}.mkv"): + current_size = os.stat(f"chunks/{header[2]}-{header[3]}.mkv").st_size + else: + current_size = 0 + + conn.send(f"RESUME {current_size}\n".encode("UTF-8")) + + remaining_data = int(header[4]) - current_size + conn.settimeout(CONNECTION_TIMEOUT_DURING_LOCK) + with open(f"chunks/{header[2]}-{header[3]}.mkv", "wb") as f: + while remaining_data: + new_data = conn.recv(remaining_data) + + if len(new_data) == 0: + connection_alive = False + conn.close() + break + + f.write(new_data) + remaining_data -= len(new_data) + + conn.settimeout(SERVER_SOCKET_TIMEOUT) + conn.send("ACCEPTED\n".encode("UTF-8")) + + open("ready.txt", "a").write(f"{header[2]}\n") + chunk.done = True + elif header[0] == "ABORT": + if file_hash != header[1]: + print(f"[{addr[0]}:{addr[1]} - ABORT] Hash mismatch: expected {file_hash}, got {header[1]}") + conn.send("RESET\n".encode("UTF-8")) + conn.close() + connection_alive = False + break + + if not header[2].isdigit(): + print(f"[{addr[0]}:{addr[1]} - ABORT] invalid seq_id: not an int (got {header[2]} instead)") + conn.send("RESET\n".encode("UTF-8")) + conn.close() + connection_alive = False + break + + if not header[3].isdigit(): + print(f"[{addr[0]}:{addr[1]} - ABORT] invalid worker_id: not an int (got {header[3]} instead)") + conn.send("RESET\n".encode("UTF-8")) + conn.close() + connection_alive = False + break + + with lock: + chunk = chunk_queue.get_chunk_by_id(int(header[2])) + + if not chunk: + print(f"[{addr[0]}:{addr[1]} - ABORT] invalid seq_id: no chunk with id {header[2]}") conn.send("RESET\n".encode("UTF-8")) conn.close() connection_alive = False break - if not header[4].isdigit(): - print(f"[{addr[0]}:{addr[1]} - UPLOAD] invalid chunk_size: not an int (got {header[3]} instead)") + worker = chunk.get_worker_by_id(int(header[3])) + + if not worker: + print(f"[{addr[0]}:{addr[1]} - ABORT] invalid worker_id: no worker with id {header[3]}") conn.send("RESET\n".encode("UTF-8")) conn.close() connection_alive = False break - with lock: - chunk = chunk_queue.get_chunk_by_id(int(header[2])) + if worker.endpoint != addr: + print(f"[{addr[0]}:{addr[1]} - ABORT] Warning: worker {worker.id} for chunk {chunk.seq_id} changed it's endpoint ({worker.endpoint[0]}:{worker.endpoint[1]} -> {addr[0]}:{addr[1]})") + worker.endpoint = addr - if not chunk: - print(f"[{addr[0]}:{addr[1]} - UPLOAD] invalid seq_id: no chunk with id {header[2]}") - conn.send("RESET\n".encode("UTF-8")) - conn.close() - connection_alive = False - break - - worker = chunk.get_worker_by_id(int(header[3])) - - if not worker: - print(f"[{addr[0]}:{addr[1]} - UPLOAD] invalid worker_id: no worker with id {header[3]}") - conn.send("RESET\n".encode("UTF-8")) - conn.close() - connection_alive = False - break - - if worker.endpoint != addr: - print(f"[{addr[0]}:{addr[1]} - UPLOAD] Warning: worker {worker.id} for chunk {chunk.seq_id} changed it's endpoint ({worker.endpoint[0]}:{worker.endpoint[1]} -> {addr[0]}:{addr[1]})") - worker.endpoint = addr - - worker.update_timestamp() - - if os.path.exists(f"chunks/{header[2]}-{header[3]}.mkv"): - current_size = os.stat(f"chunks/{header[2]}-{header[3]}.mkv").st_size - else: - current_size = 0 - - conn.send(f"RESUME {current_size}\n".encode("UTF-8")) - - remaining_data = int(header[4]) - current_size - conn.settimeout(CONNECTION_TIMEOUT_DURING_LOCK) - with open(f"chunks/{header[2]}-{header[3]}.mkv", "wb") as f: - while remaining_data: - new_data = conn.recv(remaining_data) - - if len(new_data) == 0: - connection_alive = False - conn.close() - break - - f.write(new_data) - remaining_data -= len(new_data) - - conn.settimeout(SERVER_SOCKET_TIMEOUT) - conn.send("ACCEPTED\n".encode("UTF-8")) - - open("ready.txt", "a").write(f"{header[2]}\n") - chunk.done = True - case "ABORT": - if file_hash != header[1]: - print(f"[{addr[0]}:{addr[1]} - ABORT] Hash mismatch: expected {file_hash}, got {header[1]}") - conn.send("RESET\n".encode("UTF-8")) - conn.close() - connection_alive = False - break - - if not header[2].isdigit(): - print(f"[{addr[0]}:{addr[1]} - ABORT] invalid seq_id: not an int (got {header[2]} instead)") - conn.send("RESET\n".encode("UTF-8")) - conn.close() - connection_alive = False - break - - if not header[3].isdigit(): - print(f"[{addr[0]}:{addr[1]} - ABORT] invalid worker_id: not an int (got {header[3]} instead)") - conn.send("RESET\n".encode("UTF-8")) - conn.close() - connection_alive = False - break - - with lock: - chunk = chunk_queue.get_chunk_by_id(int(header[2])) - - if not chunk: - print(f"[{addr[0]}:{addr[1]} - ABORT] invalid seq_id: no chunk with id {header[2]}") - conn.send("RESET\n".encode("UTF-8")) - conn.close() - connection_alive = False - break - - worker = chunk.get_worker_by_id(int(header[3])) - - if not worker: - print(f"[{addr[0]}:{addr[1]} - ABORT] invalid worker_id: no worker with id {header[3]}") - conn.send("RESET\n".encode("UTF-8")) - conn.close() - connection_alive = False - break - - if worker.endpoint != addr: - print(f"[{addr[0]}:{addr[1]} - ABORT] Warning: worker {worker.id} for chunk {chunk.seq_id} changed it's endpoint ({worker.endpoint[0]}:{worker.endpoint[1]} -> {addr[0]}:{addr[1]})") - worker.endpoint = addr - - print(f"[{addr[0]}:{addr[1]} - ABORT] Removing worker {worker.id} from chunk {chunk.seq_id}, leaving chunk with {len(chunk.assigned_workers)} workers") - chunk.remove_worker(worker) - print(f"[{addr[0]}:{addr[1]} - ABORT] Updated chunk {chunk.seq_id}, it is now \"{chunk.status()}\"") - case _: - conn.send("UNSUPPORTED\n".encode("UTF-8")) + print(f"[{addr[0]}:{addr[1]} - ABORT] Removing worker {worker.id} from chunk {chunk.seq_id}, leaving chunk with {len(chunk.assigned_workers)} workers") + chunk.remove_worker(worker) + print(f"[{addr[0]}:{addr[1]} - ABORT] Updated chunk {chunk.seq_id}, it is now \"{chunk.status()}\"") + else: + conn.send("UNSUPPORTED\n".encode("UTF-8")) print(f"[init:network] Listening on {sys.argv[4]}")