initial commit

This commit is contained in:
2023-08-25 12:42:22 +03:00
commit c39b98abe0
3 changed files with 631 additions and 0 deletions
+394
View File
@@ -0,0 +1,394 @@
# Official implementation of the Distributed Video Transcoding Protocol (DVTP) client
# Copyright (C) 2023 Dymik739
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
# Email: user@109.86.70.81
import os
import sys
import time
import socket
import shutil
import inspect
import hashlib
import datetime
import subprocess
CONNECTION_TIMEOUT = 15
WAIT_BEFORE_RECONNECTING = 5
SILENT_RENDER_PROCESS = False
if len(sys.argv) > 2:
for i in sys.argv[2:]:
if i == "--silent-render":
SILENT_RENDER_PROCESS = True
def log(msg, color = ""):
# it's black magic time!
current_caller = inspect.stack()[1]
try:
class_name = current_caller[0].f_locals["self"].__class__.__name__
except:
class_name = "__main__"
try:
method_name = current_caller.function
except:
method_name = "main"
# okay, back to being normal
current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
color_clear = "\033[0m"
print(f"{color}[{current_time} {class_name}.{method_name}] {msg}{color_clear}")
def error(msg):
color_red = "\033[31m"
log("Error: " + msg, color_red)
def warn(msg):
color_yellow = "\033[33m"
log("Warning: " + msg, color_yellow)
class ConnectionManager:
def __init__(self, addr):
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.addr = tuple(addr)
self.databuffer = bytes()
self.socket.connect(self.addr)
self.socket.settimeout(CONNECTION_TIMEOUT)
def reconnect(self):
try:
self.socket.close()
except:
pass
try:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect(self.addr)
self.socket.settimeout(CONNECTION_TIMEOUT)
log("Reconnection successful")
return True
except Exception as e:
error(f"Failed to reconnect to target {self.addr[0]}:{self.addr[1]} (exception \"{e}\")")
return False
def send(self, data, raw = False):
try:
if raw:
length_left_to_send = len(data)
while length_left_to_send:
length_left_to_send -= self.socket.send(data[-length_left_to_send:])
else:
self.socket.send(data.encode("UTF-8"))
return True
except KeyboardInterrupt as e:
exit(0)
except:
error(f"Could not send data: connection loss detected; reconnecting after {WAIT_BEFORE_RECONNECTING} seconds...")
connected = False
while not connected:
time.sleep(WAIT_BEFORE_RECONNECTING)
connected = self.reconnect()
return False
def fsend(self, data, raw = False):
if raw:
log(f"Sending raw data ({len(data)} bytes)")
else:
log(f"Sending {data}")
sent = False
while not sent:
sent = self.send(data, raw)
def recv(self):
try:
self.databuffer += self.socket.recv(64)
return True
except:
error(f"Could not receive data: connection loss detected; reconnecting after {WAIT_BEFORE_RECONNECTING} seconds...")
connected = False
while not connected:
time.sleep(WAIT_BEFORE_RECONNECTING)
connected = self.reconnect()
return False
def receive_message(self):
try:
while bytes("\n", "UTF-8") not in self.databuffer:
if not self.recv():
return False
raw_header = self.databuffer.split("\n".encode('UTF-8'))[0]
self.databuffer = self.databuffer[len(raw_header)+1:]
message = raw_header.decode("UTF-8").split()
log(f"Debug: {message}")
if len(message) == 0:
warn("Got message of length 0")
return message
except Exception as e:
warn(f"Got exception \"{e}\"")
return False
def talk(self, data, raw = False):
'''
Wrapper for sending and receiving data that ensures the two-way communication is finished correctly.
'''
while True:
self.fsend(data, raw)
message = self.receive_message()
if not message:
continue
else:
return message
def ping(self, file_hash, seq_id, worker_id):
message = self.talk(f"PING {file_hash} {seq_id} {worker_id}\n")
return message[1] == 'done'
def next(self):
message = self.talk("NEXT\n")
''' not backwards compatible
match message[0]:
case "CHUNK":
return {'type': 'chunk',
'sha256_sum': message[1],
'seq_id': message[2],
'start': message[3],
'duration': message[4],
'ping_interval': float(message[5]),
'worker_id': message[6]}
case "WAIT":
return {'type': 'wait',
'delay': float(message[1])}
case "FINISH":
return {'type': 'finish'}
'''
if message[0] == "CHUNK":
return {'type': 'chunk',
'sha256_sum': message[1],
'seq_id': message[2],
'start': message[3],
'duration': message[4],
'ping_interval': float(message[5]),
'worker_id': message[6]}
elif message[0] == "WAIT":
return {'type': 'wait',
'delay': float(message[1])}
elif message[0] == "FINISH":
return {'type': 'finish'}
def upload(self, file, file_hash, seq_id, worker_id):
message = self.talk(f"UPLOAD {file_hash} {seq_id} {worker_id} {os.stat(file).st_size}\n")
if message[0] == 'ACCEPT':
log(f"Server accepts chunk {seq_id}-{worker_id}.mkv, uploading...")
responce = self.talk(open(file, 'rb').read(), True)
log(f"Succesfully uploaded chunk {seq_id}-{worker_id}.mkv, responce: {responce[0]}")
elif message[0] == 'DISCARD':
warn(f"Server discarded chunk {seq_id}-{worker_id}.mkv")
def get_file_hash(self):
return self.talk("HASH\n")[1]
def get_file_size(self):
return self.talk("SIZE\n")[1]
def get_video_url(self):
return self.talk("URL\n")[1]
def get_script_url(self):
return self.talk("SCRIPT\n")[1]
class CacheManager:
def __init__(self, file_hash, file_size, file_url, script_url):
log("Preparing cache...")
self.required_file_hash = file_hash
self.required_file_size = int(file_size)
self.required_file_url = file_url
self.required_file_name = file_url.rsplit("/", 1)[1]
self.script_url = script_url
self.script_name = script_url.rsplit("/", 1)[1]
#self.db = self.build_local_cache_db()
self.target_file = self.find_file_by_hash_and_size(file_hash, file_size)
if not self.target_file:
free_space = shutil.disk_usage("cache/").free
if free_space <= self.required_file_size:
error(f"Not enough space: required {file_size} bytes, available {free_space}")
exit(1)
log("Downloading required file...")
os.system(f"cd cache/ && wget -c {file_url}")
#fetched_file_hash = hashlib.file_digest(open("cache/" + self.required_file_name, 'rb'), 'sha256').hexdigest()
fetched_file_hash = self.file_digest_sha256("cache/" + self.required_file_name)
if fetched_file_hash != self.required_file_hash:
error(f"Hash mismatch: (local) {fetched_file_hash} != {self.required_file_hash} (remote)")
error("Failed to download remote file, exiting.")
exit(1)
self.target_file = self.required_file_name
os.system(f"cd scripts/ && wget -c {script_url}")
log("Cache is ready")
def find_file_by_hash_and_size(self, file_hash, file_size):
log(f"Searching for file with hash {file_hash} and size {file_size}")
for filename in os.listdir("cache/"):
size = os.stat(f"cache/{filename}").st_size
if size != int(file_size):
log(f"{filename} - size mismatch: (local) {size} != {file_size} (remote)")
continue
else:
log(f"{filename} has correct size, checking hash...")
#checksum = hashlib.file_digest(open(f"cache/{filename}", 'rb'), 'sha256').hexdigest()
checksum = self.file_digest_sha256(f"cache/{filename}")
log(f"{filename}: {checksum}")
if checksum == file_hash:
log(f"Found required file: {filename}")
return filename
else:
log("File does not exist")
return None
def get_required_file_name(self):
return "cache/" + self.target_file
def get_required_script_name(self):
return "scripts/" + self.script_name
def file_digest_sha256(self, filename):
'''
Compatibility layer
'''
with open(filename, 'rb') as f:
return hashlib.sha256(f.read()).hexdigest()
class RenderManager:
def __init__(self):
self.render_subprocess = None
self.start = None
self.duration = None
self.chunk_name = None
def running(self):
try:
return self.render_subprocess.poll() == None
except:
return False
def assign(self, seq_id, worker_id, start, duration):
global cache_manager
for i in os.listdir("temp/"):
os.system(f"rm temp/{i}")
self.chunk_name = f"temp/{seq_id}-{worker_id}.mkv"
self.file_name = cache_manager.get_required_file_name()
self.script_name = cache_manager.get_required_script_name()
self.start = start
self.duration = duration
if SILENT_RENDER_PROCESS:
self.render_subprocess = subprocess.Popen(["sh", self.script_name, self.file_name,
self.start, self.duration, self.chunk_name], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
else:
self.render_subprocess = subprocess.Popen(["sh", self.script_name, self.file_name,
self.start, self.duration, self.chunk_name], stdin=subprocess.PIPE)
def stop(self):
log("Cancelling render job")
self.render_subprocess.communicate(input='q')
self.render_subprocess.wait()
def get_chunk_name(self):
return self.chunk_name
for i in ['cache/', 'scripts/', 'temp/']:
if not os.path.exists(i):
os.mkdir(i)
remote_addr = sys.argv[1].split(":")
remote_addr[1] = int(remote_addr[1])
conn_manager = ConnectionManager(remote_addr)
file_hash = conn_manager.get_file_hash()
file_size = conn_manager.get_file_size()
file_url = conn_manager.get_video_url()
script_url = conn_manager.get_script_url()
cache_manager = CacheManager(file_hash, file_size, file_url, script_url)
render_manager = RenderManager()
while True:
log("Requesting new chunk...")
new_chunk = conn_manager.next()
if new_chunk['type'] == 'finish':
log("Job has been completed, exiting")
exit(0)
elif new_chunk['type'] == 'wait':
log(f"No chunks available yet, waiting for {new_chunk['delay']} seconds...")
time.sleep(new_chunk['delay'])
continue
# assuming (else new_chunk['type'] == 'chunk') to be always true
log(f"Got assigned to chunk with seq_id={new_chunk['seq_id']} as worker_id={new_chunk['worker_id']}")
render_manager.assign(new_chunk['seq_id'], new_chunk['worker_id'], new_chunk['start'], new_chunk['duration'])
upload_required = True
while render_manager.running():
time.sleep(new_chunk['ping_interval'])
log("Pinging the server...")
if conn_manager.ping(file_hash, new_chunk['seq_id'], new_chunk['worker_id']):
warn("Current chunk has been rendered by other client, getting new chunk...")
upload_required = False
render_manager.stop()
else:
log("Server wants current chunk, rendering...")
if not upload_required:
warn("Skipping upload")
continue
log("Uploading rendered file...")
file_to_upload = render_manager.get_chunk_name()
conn_manager.upload(file_to_upload, file_hash, new_chunk['seq_id'], new_chunk['worker_id'])