improve backward compatibility and fix minor bugs

This commit is contained in:
dymik739 2023-08-25 12:28:39 +03:00
parent cb8bc31517
commit 0097e71e29
1 changed files with 235 additions and 234 deletions

View File

@ -148,7 +148,9 @@ file_size = os.stat(fn).st_size
print(f"[init:video] File size: {file_size} bytes (~{round(file_size/1024/1024, 2)} MB)") print(f"[init:video] File size: {file_size} bytes (~{round(file_size/1024/1024, 2)} MB)")
print("[init:video] Computing checksum... (might take some time)") print("[init:video] Computing checksum... (might take some time)")
file_hash = hashlib.file_digest(open(fn, 'rb'), 'sha256').hexdigest() with open(fn, 'rb') as f:
file_hash = hashlib.sha256(f.read()).hexdigest()
#file_hash = hashlib.file_digest(open(fn, 'rb'), 'sha256').hexdigest()
print(f"[init:video] Checksum for the file: {file_hash}") print(f"[init:video] Checksum for the file: {file_hash}")
total_chunk_amount = math.ceil(duration / chunk_length) total_chunk_amount = math.ceil(duration / chunk_length)
@ -222,291 +224,290 @@ def connection_processor(conn, addr):
print(f"[{addr[0]}:{addr[1]} - parser] Received command: {' '.join(header)}") print(f"[{addr[0]}:{addr[1]} - parser] Received command: {' '.join(header)}")
match header[0]: if header[0] == "DEBUG":
case "DEBUG": with lock:
with lock: conn.send(f"DEBUG\nCHUNKS {str(chunk_queue)}\n".encode("UTF-8"))
conn.send(f"DEBUG\nCHUNKS {str(chunk_queue)}\n".encode("UTF-8")) elif header[0] == "HASH":
case "HASH": conn.send(f"HASH {file_hash}\n".encode("UTF-8"))
conn.send(f"HASH {file_hash}\n".encode("UTF-8")) elif header[0] == "SIZE":
case "SIZE": conn.send(f"SIZE {file_size}\n".encode("UTF-8"))
conn.send(f"SIZE {file_size}\n".encode("UTF-8")) elif header[0] == "URL":
case "URL": conn.send(f"URL {video_url}\n".encode("UTF-8"))
conn.send(f"URL {video_url}\n".encode("UTF-8")) elif header[0] == "SCRIPT":
case "SCRIPT": conn.send(f"SCRIPT {script_url}\n".encode("UTF-8"))
conn.send(f"SCRIPT {script_url}\n".encode("UTF-8")) elif header[0] == "NEXT":
case "NEXT": with lock:
with lock: next_chunk = chunk_queue.next_wanted()
next_chunk = chunk_queue.next_wanted() if next_chunk:
if next_chunk: new_worker = Worker(next_worker_id, addr)
new_worker = Worker(next_worker_id, addr) next_worker_id += 1
next_worker_id += 1
next_chunk.add_worker(new_worker) next_chunk.add_worker(new_worker)
conn.send(f"CHUNK {file_hash} {next_chunk.seq_id} {next_chunk.start} {next_chunk.length} {RECOMMENDED_PING_INTERVAL} {new_worker.id}\n".encode("UTF-8")) conn.send(f"CHUNK {file_hash} {next_chunk.seq_id} {next_chunk.start} {next_chunk.length} {RECOMMENDED_PING_INTERVAL} {new_worker.id}\n".encode("UTF-8"))
else:
if chunk_queue.processing_finished():
conn.send("FINISH\n".encode("UTF-8"))
else: else:
if chunk_queue.processing_finished(): conn.send(f"WAIT {CHUNK_CHECK_INTERVAL}\n".encode("UTF-8"))
conn.send("FINISH\n".encode("UTF-8")) elif header[0] == "PING":
else: if file_hash != header[1]:
conn.send(f"WAIT {CHUNK_CHECK_INTERVAL}") print(f"[{addr[0]}:{addr[1]} - PING] Hash mismatch: expected {file_hash}, got {header[1]}")
case "PING": conn.send("RESET\n".encode("UTF-8"))
if file_hash != header[1]: conn.close()
print(f"[{addr[0]}:{addr[1]} - PING] Hash mismatch: expected {file_hash}, got {header[1]}") connection_alive = False
conn.send("RESET\n".encode("UTF-8")) break
conn.close()
connection_alive = False if not header[2].isdigit():
break print(f"[{addr[0]}:{addr[1]} - PING] invalid seq_id: not an int (got {header[2]} instead)")
conn.send("RESET\n".encode("UTF-8"))
if not header[2].isdigit(): conn.close()
print(f"[{addr[0]}:{addr[1]} - PING] invalid seq_id: not an int (got {header[2]} instead)") connection_alive = False
break
if not header[3].isdigit():
print(f"[{addr[0]}:{addr[1]} - PING] invalid worker_id: not an int (got {header[3]} instead)")
conn.send("RESET\n".encode("UTF-8"))
conn.close()
connection_alive = False
break
with lock:
chunk = chunk_queue.get_chunk_by_id(int(header[2]))
if not chunk:
print(f"[{addr[0]}:{addr[1]} - PING] invalid seq_id: no chunk with id {header[2]}")
conn.send("RESET\n".encode("UTF-8")) conn.send("RESET\n".encode("UTF-8"))
conn.close() conn.close()
connection_alive = False connection_alive = False
break break
if not header[3].isdigit(): worker = chunk.get_worker_by_id(int(header[3]))
print(f"[{addr[0]}:{addr[1]} - PING] invalid worker_id: not an int (got {header[3]} instead)")
if not worker:
print(f"[{addr[0]}:{addr[1]} - PING] invalid worker_id: no worker with id {header[3]}")
conn.send("RESET\n".encode("UTF-8")) conn.send("RESET\n".encode("UTF-8"))
conn.close() conn.close()
connection_alive = False connection_alive = False
break break
with lock: if worker.endpoint != addr:
chunk = chunk_queue.get_chunk_by_id(int(header[2])) print(f"[{addr[0]}:{addr[1]} - PING] Warning: worker {worker.id} for chunk {chunk.seq_id} changed it's endpoint ({worker.endpoint[0]}:{worker.endpoint[1]} -> {addr[0]}:{addr[1]})")
worker.endpoint = addr
if not chunk: worker.update_timestamp()
print(f"[{addr[0]}:{addr[1]} - PING] invalid seq_id: no chunk with id {header[2]}")
conn.send("RESET\n".encode("UTF-8"))
conn.close()
connection_alive = False
break
worker = chunk.get_worker_by_id(int(header[3])) if chunk.done:
status = "done"
else:
status = "waiting"
if not worker: conn.send(f"PONG {status}\n".encode("UTF-8"))
print(f"[{addr[0]}:{addr[1]} - PING] invalid worker_id: no worker with id {header[3]}") elif header[0] == "UPLOAD":
conn.send("RESET\n".encode("UTF-8")) if file_hash != header[1]:
conn.close() print(f"[{addr[0]}:{addr[1]} - UPLOAD] Hash mismatch: expected {file_hash}, got {header[1]}")
connection_alive = False conn.send("RESET\n".encode("UTF-8"))
break conn.close()
connection_alive = False
break
if not header[2].isdigit():
print(f"[{addr[0]}:{addr[1]} - UPLOAD] invalid seq_id: not an int (got {header[2]} instead)")
conn.send("RESET\n".encode("UTF-8"))
conn.close()
connection_alive = False
break
if worker.endpoint != addr: if not header[3].isdigit():
print(f"[{addr[0]}:{addr[1]} - PING] Warning: worker {worker.id} for chunk {chunk.seq_id} changed it's endpoint ({worker.endpoint[0]}:{worker.endpoint[1]} -> {addr[0]}:{addr[1]})") print(f"[{addr[0]}:{addr[1]} - UPLOAD] invalid worker_id: not an int (got {header[3]} instead)")
worker.endpoint = addr conn.send("RESET\n".encode("UTF-8"))
conn.close()
connection_alive = False
break
worker.update_timestamp() if not header[4].isdigit():
print(f"[{addr[0]}:{addr[1]} - UPLOAD] invalid chunk_size: not an int (got {header[3]} instead)")
conn.send("RESET\n".encode("UTF-8"))
conn.close()
connection_alive = False
break
if chunk.done: with lock:
status = "done" chunk = chunk_queue.get_chunk_by_id(int(header[2]))
else:
status = "waiting"
conn.send(f"PONG {status}\n".encode("UTF-8")) if not chunk:
case "UPLOAD": print(f"[{addr[0]}:{addr[1]} - UPLOAD] invalid seq_id: no chunk with id {header[2]}")
if file_hash != header[1]:
print(f"[{addr[0]}:{addr[1]} - UPLOAD] Hash mismatch: expected {file_hash}, got {header[1]}")
conn.send("RESET\n".encode("UTF-8"))
conn.close()
connection_alive = False
break
if not header[2].isdigit():
print(f"[{addr[0]}:{addr[1]} - UPLOAD] invalid seq_id: not an int (got {header[2]} instead)")
conn.send("RESET\n".encode("UTF-8")) conn.send("RESET\n".encode("UTF-8"))
conn.close() conn.close()
connection_alive = False connection_alive = False
break break
if not header[3].isdigit(): worker = chunk.get_worker_by_id(int(header[3]))
print(f"[{addr[0]}:{addr[1]} - UPLOAD] invalid worker_id: not an int (got {header[3]} instead)")
if not worker:
print(f"[{addr[0]}:{addr[1]} - UPLOAD] invalid worker_id: no worker with id {header[3]}")
conn.send("RESET\n".encode("UTF-8")) conn.send("RESET\n".encode("UTF-8"))
conn.close() conn.close()
connection_alive = False connection_alive = False
break break
if not header[4].isdigit(): if worker.endpoint != addr:
print(f"[{addr[0]}:{addr[1]} - UPLOAD] invalid chunk_size: not an int (got {header[3]} instead)") print(f"[{addr[0]}:{addr[1]} - UPLOAD] Warning: worker {worker.id} for chunk {chunk.seq_id} changed it's endpoint ({worker.endpoint[0]}:{worker.endpoint[1]} -> {addr[0]}:{addr[1]})")
worker.endpoint = addr
worker.update_timestamp()
conn.send(f"ACCEPT\n".encode("UTF-8"))
remaining_data = int(header[4])
conn.settimeout(CONNECTION_TIMEOUT_DURING_LOCK)
with open(f"chunks/{header[2]}-{header[3]}.mkv", "wb") as f:
while remaining_data:
new_data = conn.recv(remaining_data)
if len(new_data) == 0:
connection_alive = False
conn.close()
break
f.write(new_data)
remaining_data -= len(new_data)
conn.settimeout(SERVER_SOCKET_TIMEOUT)
conn.send("ACCEPTED\n".encode("UTF-8"))
open("ready.txt", "a").write(f"{header[2]}\n")
chunk.done = True
elif header[0] == "REUPLOAD":
if file_hash != header[1]:
print(f"[{addr[0]}:{addr[1]} - UPLOAD] Hash mismatch: expected {file_hash}, got {header[1]}")
conn.send("RESET\n".encode("UTF-8"))
conn.close()
connection_alive = False
break
if not header[2].isdigit():
print(f"[{addr[0]}:{addr[1]} - UPLOAD] invalid seq_id: not an int (got {header[2]} instead)")
conn.send("RESET\n".encode("UTF-8"))
conn.close()
connection_alive = False
break
if not header[3].isdigit():
print(f"[{addr[0]}:{addr[1]} - UPLOAD] invalid worker_id: not an int (got {header[3]} instead)")
conn.send("RESET\n".encode("UTF-8"))
conn.close()
connection_alive = False
break
if not header[4].isdigit():
print(f"[{addr[0]}:{addr[1]} - UPLOAD] invalid chunk_size: not an int (got {header[3]} instead)")
conn.send("RESET\n".encode("UTF-8"))
conn.close()
connection_alive = False
break
with lock:
chunk = chunk_queue.get_chunk_by_id(int(header[2]))
if not chunk:
print(f"[{addr[0]}:{addr[1]} - UPLOAD] invalid seq_id: no chunk with id {header[2]}")
conn.send("RESET\n".encode("UTF-8")) conn.send("RESET\n".encode("UTF-8"))
conn.close() conn.close()
connection_alive = False connection_alive = False
break break
with lock: worker = chunk.get_worker_by_id(int(header[3]))
chunk = chunk_queue.get_chunk_by_id(int(header[2]))
if not chunk: if not worker:
print(f"[{addr[0]}:{addr[1]} - UPLOAD] invalid seq_id: no chunk with id {header[2]}") print(f"[{addr[0]}:{addr[1]} - UPLOAD] invalid worker_id: no worker with id {header[3]}")
conn.send("RESET\n".encode("UTF-8"))
conn.close()
connection_alive = False
break
worker = chunk.get_worker_by_id(int(header[3]))
if not worker:
print(f"[{addr[0]}:{addr[1]} - UPLOAD] invalid worker_id: no worker with id {header[3]}")
conn.send("RESET\n".encode("UTF-8"))
conn.close()
connection_alive = False
break
if worker.endpoint != addr:
print(f"[{addr[0]}:{addr[1]} - UPLOAD] Warning: worker {worker.id} for chunk {chunk.seq_id} changed it's endpoint ({worker.endpoint[0]}:{worker.endpoint[1]} -> {addr[0]}:{addr[1]})")
worker.endpoint = addr
worker.update_timestamp()
conn.send(f"ACCEPT\n".encode("UTF-8"))
remaining_data = int(header[4])
conn.settimeout(CONNECTION_TIMEOUT_DURING_LOCK)
with open(f"chunks/{header[2]}-{header[3]}.mkv", "wb") as f:
while remaining_data:
new_data = conn.recv(remaining_data)
if len(new_data) == 0:
connection_alive = False
conn.close()
break
f.write(new_data)
remaining_data -= len(new_data)
conn.settimeout(SERVER_SOCKET_TIMEOUT)
conn.send("ACCEPTED\n".encode("UTF-8"))
open("ready.txt", "a").write(f"{header[2]}\n")
chunk.done = True
case "REUPLOAD":
if file_hash != header[1]:
print(f"[{addr[0]}:{addr[1]} - UPLOAD] Hash mismatch: expected {file_hash}, got {header[1]}")
conn.send("RESET\n".encode("UTF-8"))
conn.close()
connection_alive = False
break
if not header[2].isdigit():
print(f"[{addr[0]}:{addr[1]} - UPLOAD] invalid seq_id: not an int (got {header[2]} instead)")
conn.send("RESET\n".encode("UTF-8")) conn.send("RESET\n".encode("UTF-8"))
conn.close() conn.close()
connection_alive = False connection_alive = False
break break
if not header[3].isdigit(): if worker.endpoint != addr:
print(f"[{addr[0]}:{addr[1]} - UPLOAD] invalid worker_id: not an int (got {header[3]} instead)") print(f"[{addr[0]}:{addr[1]} - UPLOAD] Warning: worker {worker.id} for chunk {chunk.seq_id} changed it's endpoint ({worker.endpoint[0]}:{worker.endpoint[1]} -> {addr[0]}:{addr[1]})")
worker.endpoint = addr
worker.update_timestamp()
if os.path.exists(f"chunks/{header[2]}-{header[3]}.mkv"):
current_size = os.stat(f"chunks/{header[2]}-{header[3]}.mkv").st_size
else:
current_size = 0
conn.send(f"RESUME {current_size}\n".encode("UTF-8"))
remaining_data = int(header[4]) - current_size
conn.settimeout(CONNECTION_TIMEOUT_DURING_LOCK)
with open(f"chunks/{header[2]}-{header[3]}.mkv", "wb") as f:
while remaining_data:
new_data = conn.recv(remaining_data)
if len(new_data) == 0:
connection_alive = False
conn.close()
break
f.write(new_data)
remaining_data -= len(new_data)
conn.settimeout(SERVER_SOCKET_TIMEOUT)
conn.send("ACCEPTED\n".encode("UTF-8"))
open("ready.txt", "a").write(f"{header[2]}\n")
chunk.done = True
elif header[0] == "ABORT":
if file_hash != header[1]:
print(f"[{addr[0]}:{addr[1]} - ABORT] Hash mismatch: expected {file_hash}, got {header[1]}")
conn.send("RESET\n".encode("UTF-8"))
conn.close()
connection_alive = False
break
if not header[2].isdigit():
print(f"[{addr[0]}:{addr[1]} - ABORT] invalid seq_id: not an int (got {header[2]} instead)")
conn.send("RESET\n".encode("UTF-8"))
conn.close()
connection_alive = False
break
if not header[3].isdigit():
print(f"[{addr[0]}:{addr[1]} - ABORT] invalid worker_id: not an int (got {header[3]} instead)")
conn.send("RESET\n".encode("UTF-8"))
conn.close()
connection_alive = False
break
with lock:
chunk = chunk_queue.get_chunk_by_id(int(header[2]))
if not chunk:
print(f"[{addr[0]}:{addr[1]} - ABORT] invalid seq_id: no chunk with id {header[2]}")
conn.send("RESET\n".encode("UTF-8")) conn.send("RESET\n".encode("UTF-8"))
conn.close() conn.close()
connection_alive = False connection_alive = False
break break
if not header[4].isdigit(): worker = chunk.get_worker_by_id(int(header[3]))
print(f"[{addr[0]}:{addr[1]} - UPLOAD] invalid chunk_size: not an int (got {header[3]} instead)")
if not worker:
print(f"[{addr[0]}:{addr[1]} - ABORT] invalid worker_id: no worker with id {header[3]}")
conn.send("RESET\n".encode("UTF-8")) conn.send("RESET\n".encode("UTF-8"))
conn.close() conn.close()
connection_alive = False connection_alive = False
break break
with lock: if worker.endpoint != addr:
chunk = chunk_queue.get_chunk_by_id(int(header[2])) print(f"[{addr[0]}:{addr[1]} - ABORT] Warning: worker {worker.id} for chunk {chunk.seq_id} changed it's endpoint ({worker.endpoint[0]}:{worker.endpoint[1]} -> {addr[0]}:{addr[1]})")
worker.endpoint = addr
if not chunk: print(f"[{addr[0]}:{addr[1]} - ABORT] Removing worker {worker.id} from chunk {chunk.seq_id}, leaving chunk with {len(chunk.assigned_workers)} workers")
print(f"[{addr[0]}:{addr[1]} - UPLOAD] invalid seq_id: no chunk with id {header[2]}") chunk.remove_worker(worker)
conn.send("RESET\n".encode("UTF-8")) print(f"[{addr[0]}:{addr[1]} - ABORT] Updated chunk {chunk.seq_id}, it is now \"{chunk.status()}\"")
conn.close() else:
connection_alive = False conn.send("UNSUPPORTED\n".encode("UTF-8"))
break
worker = chunk.get_worker_by_id(int(header[3]))
if not worker:
print(f"[{addr[0]}:{addr[1]} - UPLOAD] invalid worker_id: no worker with id {header[3]}")
conn.send("RESET\n".encode("UTF-8"))
conn.close()
connection_alive = False
break
if worker.endpoint != addr:
print(f"[{addr[0]}:{addr[1]} - UPLOAD] Warning: worker {worker.id} for chunk {chunk.seq_id} changed it's endpoint ({worker.endpoint[0]}:{worker.endpoint[1]} -> {addr[0]}:{addr[1]})")
worker.endpoint = addr
worker.update_timestamp()
if os.path.exists(f"chunks/{header[2]}-{header[3]}.mkv"):
current_size = os.stat(f"chunks/{header[2]}-{header[3]}.mkv").st_size
else:
current_size = 0
conn.send(f"RESUME {current_size}\n".encode("UTF-8"))
remaining_data = int(header[4]) - current_size
conn.settimeout(CONNECTION_TIMEOUT_DURING_LOCK)
with open(f"chunks/{header[2]}-{header[3]}.mkv", "wb") as f:
while remaining_data:
new_data = conn.recv(remaining_data)
if len(new_data) == 0:
connection_alive = False
conn.close()
break
f.write(new_data)
remaining_data -= len(new_data)
conn.settimeout(SERVER_SOCKET_TIMEOUT)
conn.send("ACCEPTED\n".encode("UTF-8"))
open("ready.txt", "a").write(f"{header[2]}\n")
chunk.done = True
case "ABORT":
if file_hash != header[1]:
print(f"[{addr[0]}:{addr[1]} - ABORT] Hash mismatch: expected {file_hash}, got {header[1]}")
conn.send("RESET\n".encode("UTF-8"))
conn.close()
connection_alive = False
break
if not header[2].isdigit():
print(f"[{addr[0]}:{addr[1]} - ABORT] invalid seq_id: not an int (got {header[2]} instead)")
conn.send("RESET\n".encode("UTF-8"))
conn.close()
connection_alive = False
break
if not header[3].isdigit():
print(f"[{addr[0]}:{addr[1]} - ABORT] invalid worker_id: not an int (got {header[3]} instead)")
conn.send("RESET\n".encode("UTF-8"))
conn.close()
connection_alive = False
break
with lock:
chunk = chunk_queue.get_chunk_by_id(int(header[2]))
if not chunk:
print(f"[{addr[0]}:{addr[1]} - ABORT] invalid seq_id: no chunk with id {header[2]}")
conn.send("RESET\n".encode("UTF-8"))
conn.close()
connection_alive = False
break
worker = chunk.get_worker_by_id(int(header[3]))
if not worker:
print(f"[{addr[0]}:{addr[1]} - ABORT] invalid worker_id: no worker with id {header[3]}")
conn.send("RESET\n".encode("UTF-8"))
conn.close()
connection_alive = False
break
if worker.endpoint != addr:
print(f"[{addr[0]}:{addr[1]} - ABORT] Warning: worker {worker.id} for chunk {chunk.seq_id} changed it's endpoint ({worker.endpoint[0]}:{worker.endpoint[1]} -> {addr[0]}:{addr[1]})")
worker.endpoint = addr
print(f"[{addr[0]}:{addr[1]} - ABORT] Removing worker {worker.id} from chunk {chunk.seq_id}, leaving chunk with {len(chunk.assigned_workers)} workers")
chunk.remove_worker(worker)
print(f"[{addr[0]}:{addr[1]} - ABORT] Updated chunk {chunk.seq_id}, it is now \"{chunk.status()}\"")
case _:
conn.send("UNSUPPORTED\n".encode("UTF-8"))
print(f"[init:network] Listening on {sys.argv[4]}") print(f"[init:network] Listening on {sys.argv[4]}")