From 59f2456978898c5e8c172e4ec3c31866ffdd3826 Mon Sep 17 00:00:00 2001 From: totaam Date: Fri, 21 Oct 2022 00:03:58 +0700 Subject: [PATCH] #3659 use dataclasses improve the code makes the file transfer class much more readable --- setup.py | 4 +- xpra/net/file_transfer.py | 209 ++++++++++++++++++++------------------ 2 files changed, 113 insertions(+), 100 deletions(-) diff --git a/setup.py b/setup.py index 9eedcf6fd4..1bb9038269 100755 --- a/setup.py +++ b/setup.py @@ -30,8 +30,8 @@ is_CentOS, is_AlmaLinux, is_RockyLinux, is_RedHat, is_openSUSE, is_OracleLinux, ) -if sys.version_info<(3, 6): - raise Exception("xpra no longer supports Python versions older than 3.6") +if sys.version_info<(3, 7): + raise Exception("xpra no longer supports Python versions older than 3.7") if BITS!=64: print(f"Warning: {BITS}-bit architecture, only 64-bits are officially supported") for _ in range(5): diff --git a/xpra/net/file_transfer.py b/xpra/net/file_transfer.py index ed71c0cfb9..925ee29eb6 100644 --- a/xpra/net/file_transfer.py +++ b/xpra/net/file_transfer.py @@ -9,9 +9,10 @@ import hashlib import uuid from time import monotonic +from dataclasses import dataclass from xpra.child_reaper import getChildReaper -from xpra.os_util import bytestostr, memoryview_to_bytes, umask_context, POSIX, WIN32 +from xpra.os_util import bytestostr, strtobytes, memoryview_to_bytes, umask_context, POSIX, WIN32 from xpra.util import typedict, csv, envint, envbool, engs, net_utf8, u from xpra.scripts.config import parse_bool, parse_with_unit from xpra.simple_stats import std_unit @@ -26,7 +27,6 @@ MAX_CONCURRENT_FILES = max(1, envint("XPRA_MAX_CONCURRENT_FILES", 10)) PRINT_JOB_TIMEOUT = max(60, envint("XPRA_PRINT_JOB_TIMEOUT", 3600)) SEND_REQUEST_TIMEOUT = max(300, envint("XPRA_SEND_REQUEST_TIMEOUT", 3600)) -ALWAYS_CHUNK = envbool("XPRA_FILE_ALWAYS_CHUNK", False) CHUNK_TIMEOUT = 10*1000 MIMETYPE_EXTS = { @@ -93,6 +93,30 @@ def safe_open_download_file(basefilename, mimetype): filelog(f"using filename {filename!r}, file descriptor={fd}") return filename, fd +@dataclass +class ReceiveChunkState: + start: float + fd: int + filename: str + mimetype: str + printit : bool + openit : bool + filesize: int + options: dict + digest: object + written: int + cancelled: bool + send_id: str + timer: int + chunk: str +@dataclass +class SendChunkState: + start: float + data: object + chunk_size: int + timer: int + chunk: int + class FileTransferAttributes: @@ -275,18 +299,18 @@ def _check_chunk_receiving(self, chunk_id, chunk_no): if not chunk_state: #transfer not found return - if chunk_state[-4]: + if chunk_state.cancelled: #transfer has been cancelled return - chunk_state[-2] = 0 #this timer has been used - if chunk_state[-1]==chunk_no: - filelog.error("Error: chunked file transfer '%s' timed out", chunk_id) + chunk_state.timer = 0 #this timer has been used + if chunk_state.chunk==chunk_no: + filelog.error(f"Error: chunked file transfer f{chunk_id} timed out") self.receive_chunks_in_progress.pop(chunk_id, None) def cancel_download(self, send_id, message="Cancelled"): filelog("cancel_download(%s, %s)", send_id, message) for chunk_id, chunk_state in dict(self.receive_chunks_in_progress).items(): - if chunk_state[-3]==send_id: + if chunk_state.send_id==send_id: self.cancel_file(chunk_id, message) return filelog.error("Error: cannot cancel download %s, entry not found!", u(send_id)) @@ -296,63 +320,64 @@ def cancel_file(self, chunk_id, message, chunk=0): chunk_state = self.receive_chunks_in_progress.get(chunk_id) if chunk_state: #mark it as cancelled: - chunk_state[-4] = True - timer = chunk_state[-2] + chunk_state.cancelled = True + timer = chunk_state.timer if timer: - chunk_state[-2] = 0 + chunk_state.timer = 0 self.source_remove(timer) - fd = chunk_state[1] - osclose(fd) + osclose(chunk_state.fd) #remove this transfer after a little while, #so in-flight packets won't cause errors def clean_receive_state(): self.receive_chunks_in_progress.pop(chunk_id, None) return False self.timeout_add(20000, clean_receive_state) - filename = chunk_state[2] + filename = chunk_state.filename try: os.unlink(filename) except OSError as e: - filelog("os.unlink(%s)", filename, exc_info=True) + filelog(f"os.unlink({filename})", exc_info=True) filelog.error("Error: failed to delete temporary download file") - filelog.error(" '%s' : %s", filename, e) + filelog.error(f" {filename!r} : {e}") self.send("ack-file-chunk", chunk_id, False, message, chunk) def _process_send_file_chunk(self, packet): chunk_id, chunk, file_data, has_more = packet[1:5] chunk_id = net_utf8(chunk_id) + #if len(file_data)<1024: + # from xpra.os_util import hexstr + # filelog.warn("file_data=%s", hexstr(file_data)) + #filelog(f"file_data={len(file_data)} {type(file_data)}") + filelog(f"file_data={len(file_data)} {type(file_data)}") filelog("_process_send_file_chunk%s", (chunk_id, chunk, f"{len(file_data)} bytes", has_more)) chunk_state = self.receive_chunks_in_progress.get(chunk_id) if not chunk_state: - filelog.error("Error: cannot find the file transfer id '%r'", chunk_id) + filelog.error(f"Error: cannot find the file transfer id {chunk_id!r}") self.cancel_file(chunk_id, f"file transfer id {chunk_id!r} not found", chunk) return - if chunk_state[-4]: + if chunk_state.cancelled: filelog("got chunk for a cancelled file transfer, ignoring it") return def progress(position, error=None): - start = chunk_state[0] - send_id = chunk_state[-3] - filesize = chunk_state[6] - self.transfer_progress_update(False, send_id, monotonic()-start, position, filesize, error) - fd = chunk_state[1] - if chunk_state[-1]+1!=chunk: - filelog.error("Error: chunk number mismatch, expected %i but got %i", chunk_state[-1]+1, chunk) + elapsed = monotonic()-chunk_state.start + self.transfer_progress_update(False, chunk_state.send_id, elapsed, position, chunk_state.filesize, error) + fd = chunk_state.fd + if chunk_state.chunk+1!=chunk: + filelog.error("Error: chunk number mismatch, expected %i but got %i", chunk_state.chunk+1, chunk) self.cancel_file(chunk_id, "chunk number mismatch", chunk) osclose(fd) progress(-1, "chunk no mismatch") return - file_data = memoryview_to_bytes(file_data) + #this is for legacy packet encoders only: + if isinstance(file_data, str): + file_data = strtobytes(file_data) #update chunk number: - chunk_state[-1] = chunk - digest = chunk_state[8] - written = chunk_state[9] + chunk_state.chunk = chunk try: os.write(fd, file_data) - if digest: - digest.update(file_data) - written += len(file_data) - chunk_state[9] = written + if chunk_state.digest: + chunk_state.digest.update(file_data) + chunk_state.written += len(file_data) except OSError as e: filelog.error("Error: cannot write file chunk") filelog.estr(e) @@ -360,49 +385,47 @@ def progress(position, error=None): osclose(fd) progress(-1, f"write error ({e}") return - filesize = chunk_state[6] - if written>filesize: + if chunk_state.written>chunk_state.filesize: filelog.error("Error: too much data received") progress(-1, "file data size mismatch") return self.send("ack-file-chunk", chunk_id, True, "", chunk) - if chunk_state[-4]: + if chunk_state.cancelled: #check again if the transfer has been cancelled filelog("got chunk for a cancelled file transfer, ignoring it") return if has_more: - progress(written) - timer = chunk_state[-2] - if timer: - self.source_remove(timer) + progress(chunk_state.written) + if chunk_state.timer: + self.source_remove(chunk_state.timer) #remote end will send more after receiving the ack - timer = self.timeout_add(CHUNK_TIMEOUT, self._check_chunk_receiving, chunk_id, chunk) - chunk_state[-2] = timer + chunk_state.timer = self.timeout_add(CHUNK_TIMEOUT, self._check_chunk_receiving, chunk_id, chunk) + filelog("waiting for the next chunk, got %8i of %8i: %3i%%", + chunk_state.written, chunk_state.filesize, 100*chunk_state.written/chunk_state.filesize) return #we have received all the packets self.receive_chunks_in_progress.pop(chunk_id, None) osclose(fd) - if digest: - options = chunk_state[7] - expected_digest = options.strget(digest.name) #ie: "sha256" - filename = chunk_state[2] - if expected_digest and digest.hexdigest()!=expected_digest: + filename = chunk_state.filename + options = chunk_state.options + filelog(f"file {filename!r} complete") + if chunk_state.digest: + expected_digest = options.strget(chunk_state.digest.name) #ie: "sha256" + if expected_digest and chunk_state.digest.hexdigest()!=expected_digest: progress(-1, "checksum mismatch") - self.digest_mismatch(filename, digest, expected_digest) + self.digest_mismatch(filename, chunk_state.digest, expected_digest) return - filelog("%s digest matches: %s", digest.name, expected_digest) + filelog("%s digest matches: %s", chunk_state.digest.name, expected_digest) #check file size and digest then process it: - filename, mimetype, printit, openit, filesize, options = chunk_state[2:8] - if written!=filesize: - filelog.error("Error: expected a file of %i bytes, got %i", filesize, written) + if chunk_state.written!=chunk_state.filesize: + filelog.error("Error: expected a file of %i bytes, got %i", chunk_state.filesize, chunk_state.written) progress(-1, "file size mismatch") return - progress(written) - start_time = chunk_state[0] - elapsed = monotonic()-start_time - mimetype = bytestostr(mimetype) - filelog("%i bytes received in %i chunks, took %ims", filesize, chunk, elapsed*1000) - self.process_downloaded_file(filename, mimetype, printit, openit, filesize, options) + progress(chunk_state.written) + elapsed = monotonic()-chunk_state.start + filelog("%i bytes received in %i chunks, took %ims", chunk_state.filesize, chunk, elapsed*1000) + self.process_downloaded_file(filename, chunk_state.mimetype, + chunk_state.printit, chunk_state.openit, chunk_state.filesize, options) def accept_data(self, send_id, dtype, basefilename, printit, openit): #subclasses should check the flags, @@ -491,15 +514,12 @@ def _process_send_file(self, packet): osclose(fd) return timer = self.timeout_add(CHUNK_TIMEOUT, self._check_chunk_receiving, chunk_id, chunk) - chunk_state = [ - monotonic(), - fd, filename, mimetype, - printit, openit, filesize, - options, digest, 0, False, send_id, - timer, chunk, - ] - self.receive_chunks_in_progress[chunk_id] = chunk_state - self.send("ack-file-chunk", chunk_id, True, "", chunk) + self.receive_chunks_in_progress[chunk_id] = ReceiveChunkState(monotonic(), + fd, filename, mimetype, + printit, openit, filesize, + options, digest, 0, False, send_id, + timer, chunk) + self.send("ack-file-chunk", chunk_id, True, b"", chunk) return #not chunked, full file: if not file_data: @@ -660,19 +680,13 @@ def open_done(*_args): cr = getChildReaper() cr.add_process(proc, f"Open file {url}", command, True, True, open_done) - def file_size_warning(self, action, location, basefilename, filesize, limit=0): - filelog.warn(f"Warning: cannot {action} the file {basefilename!r}") - if filesize<=0: - filelog.warn(" this file is empty") - else: - filelog.warn(" this file is too large: %sB", std_unit(filesize)) - filelog.warn(f" the {location} file size limit is %sB", std_unit(limit)) + def file_size_warning(self, action, location, basefilename, filesize, limit): + filelog.warn("Warning: cannot %s the file '%s'", action, basefilename) + filelog.warn(" this file is too large: %sB", std_unit(filesize)) + filelog.warn(" the %s file size limit is %sB", location, std_unit(limit)) def check_file_size(self, action, filename, filesize): basefilename = os.path.basename(filename) - if filesize<=0: - self.file_size_warning(action, "n/a", basefilename, filesize) - return False if filesize>self.file_size_limit: self.file_size_warning(action, "local", basefilename, filesize, self.file_size_limit) return False @@ -902,7 +916,7 @@ def do_send_file(self, filename, mimetype, data, filesize=0, printit=False, open options = options or {} options["sha256"] = h.hexdigest() chunk_size = min(self.file_chunks, self.remote_file_chunks) - if 0=MAX_CONCURRENT_FILES: raise Exception(f"too many file transfers in progress: {in_progress}") @@ -912,9 +926,8 @@ def do_send_file(self, filename, mimetype, data, filesize=0, printit=False, open #timer to check that the other end is requesting more chunks: chunk_no = 0 timer = self.timeout_add(CHUNK_TIMEOUT, self._check_chunk_sending, chunk_id, chunk_no) - chunk_state = [monotonic(), data, chunk_size, timer, chunk_no] - self.send_chunks_in_progress[chunk_id] = chunk_state - cdata = "" + self.send_chunks_in_progress[chunk_id] = SendChunkState(monotonic(), data, chunk_size, timer, chunk_no) + cdata = b"" filelog("using chunks, sending initial file-chunk-id=%s, for chunk size=%s", chunk_id, chunk_size) else: @@ -932,10 +945,10 @@ def _check_chunk_sending(self, chunk_id, chunk_no): if not chunk_state: #transfer already removed return - chunk_state[3] = 0 #timer has fired - if chunk_state[-1]==chunk_no: - filelog.error("Error: chunked file transfer '%s' timed out", chunk_id) - filelog.error(" on chunk %i", chunk_no) + chunk_state.timer = 0 #timer has fired + if chunk_state.chunk==chunk_no: + filelog.error(f"Error: chunked file transfer {chunk_id} timed out") + filelog.error(f" on chunk {chunk_no}") self.cancel_sending(chunk_id) def cancel_sending(self, chunk_id): @@ -943,9 +956,9 @@ def cancel_sending(self, chunk_id): filelog("cancel_sending(%s) chunk state found: %s", chunk_id, bool(chunk_state)) if not chunk_state: return - timer = chunk_state[3] + timer = chunk_state.timer if timer: - chunk_state[3] = 0 + chunk_state.timer = 0 self.source_remove(timer) def _process_ack_file_chunk(self, packet): @@ -963,28 +976,28 @@ def _process_ack_file_chunk(self, packet): if not chunk_state: filelog.error(f"Error: cannot find the file transfer id {chunk_id!r}") return - if chunk_state[-1]!=chunk: - filelog.error("Error: chunk number mismatch (%i vs %i)", chunk_state[-1], chunk) + if chunk_state.chunk!=chunk: + filelog.error("Error: chunk number mismatch (%i vs %i)", chunk_state.chunk, chunk) self.cancel_sending(chunk_id) return - start_time, data, chunk_size, timer, chunk = chunk_state - if not data: + chunk_size = chunk_state.chunk_size + if not chunk_state.data: #all sent! - elapsed = monotonic()-start_time + elapsed = monotonic()-chunk_state.start filelog("%i chunks of %i bytes sent in %ims (%sB/s)", chunk, chunk_size, elapsed*1000, std_unit(chunk*chunk_size/elapsed)) self.cancel_sending(chunk_id) return assert chunk_size>0 #carve out another chunk: - cdata = self.compressed_wrapper("file-data", data[:chunk_size]) - data = data[chunk_size:] + cdata = self.compressed_wrapper("file-data", chunk_state.data[:chunk_size]) + chunk_state.data = chunk_state.data[chunk_size:] chunk += 1 - if timer: - self.source_remove(timer) - timer = self.timeout_add(CHUNK_TIMEOUT, self._check_chunk_sending, chunk_id, chunk) - self.send_chunks_in_progress[chunk_id] = [start_time, data, chunk_size, timer, chunk] - self.send("send-file-chunk", chunk_id, chunk, cdata, bool(data)) + chunk_state.chunk = chunk + if chunk_state.timer: + self.source_remove(chunk_state.timer) + chunk_state.timer = self.timeout_add(CHUNK_TIMEOUT, self._check_chunk_sending, chunk_id, chunk) + self.send("send-file-chunk", chunk_id, chunk, cdata, bool(chunk_state.data)) def send(self, *parts): raise NotImplementedError()