From b9cc33022d80f569c1f39c5f9ac02d4106aa947c Mon Sep 17 00:00:00 2001 From: anasty17 Date: Tue, 23 Jan 2024 06:24:19 +0200 Subject: [PATCH] Bug fix - Fix jdownloader google drive link when token not exists - Fix convert media while mirror 1 file only - Fix seed when convert media enabled - Fix upload old files that have been converted Signed-off-by: anasty17 --- bot/helper/common.py | 159 ++++++++++++------ bot/helper/ext_utils/media_utils.py | 47 +++--- bot/helper/listeners/aria2_listener.py | 4 + bot/helper/listeners/direct_listener.py | 51 +++--- bot/helper/listeners/task_listener.py | 71 ++++---- .../download_utils/direct_downloader.py | 6 +- .../download_utils/gd_download.py | 6 +- .../download_utils/jd_download.py | 6 +- .../download_utils/rclone_download.py | 4 +- .../download_utils/telegram_download.py | 10 +- .../download_utils/yt_dlp_download.py | 15 +- bot/helper/mirror_utils/gdrive_utils/clone.py | 8 +- .../mirror_utils/gdrive_utils/upload.py | 15 +- .../mirror_utils/rclone_utils/transfer.py | 13 +- .../status_utils/direct_status.py | 6 +- .../status_utils/extract_status.py | 6 +- .../status_utils/gdrive_status.py | 6 +- .../status_utils/media_convert_status.py | 6 +- .../mirror_utils/status_utils/queue_status.py | 6 +- .../status_utils/sample_video_status.py | 6 +- .../mirror_utils/status_utils/split_status.py | 6 +- .../status_utils/telegram_status.py | 6 +- .../mirror_utils/status_utils/zip_status.py | 6 +- bot/helper/mirror_utils/telegram_uploader.py | 26 +-- bot/modules/clone.py | 14 +- bot/modules/mirror_leech.py | 4 +- 26 files changed, 285 insertions(+), 228 deletions(-) diff --git a/bot/helper/common.py b/bot/helper/common.py index 7a6f300c7ec..6353e911fb4 100644 --- a/bot/helper/common.py +++ b/bot/helper/common.py @@ -1,8 +1,9 @@ -from aiofiles.os import path as aiopath, remove +from aiofiles.os import path as aiopath, remove, makedirs from asyncio import sleep, create_subprocess_exec from asyncio.subprocess import PIPE from os import walk, path as ospath from secrets import token_urlsafe +from aioshutil import move, copy2 from bot import ( DOWNLOAD_DIR, @@ -78,6 +79,7 @@ def __init__(self): self.splitSize = 0 self.maxSplitSize = 0 self.multi = 0 + self.size = 0 self.isLeech = False self.isQbit = False self.isJd = False @@ -103,6 +105,7 @@ def __init__(self): self.forceRun = False self.forceDownload = False self.forceUpload = False + self.isTorrent = False self.suproc = None self.thumb = None self.extensionFilter = [] @@ -150,7 +153,7 @@ async def beforeStart(self): if "excluded_extensions" not in self.userDict else ["aria2", "!qB"] ) - if not self.isYtDlp: + if not self.isYtDlp and not self.isJd: if self.link not in ["rcl", "gdl"]: await self.isTokenExists(self.link, "dl") elif self.link == "rcl": @@ -395,12 +398,12 @@ async def initBulk(self, input_list, bulk_start, bulk_end, obj): "Reply to text file or to telegram message that have links seperated by new line!", ) - async def proceedExtract(self, dl_path, size, gid): + async def proceedExtract(self, dl_path, gid): pswd = self.extract if isinstance(self.extract, str) else "" try: LOGGER.info(f"Extracting: {self.name}") async with task_dict_lock: - task_dict[self.mid] = ExtractStatus(self, size, gid) + task_dict[self.mid] = ExtractStatus(self, gid) if await aiopath.isdir(dl_path): if self.seed: self.newDir = f"{self.dir}10000" @@ -507,15 +510,15 @@ async def proceedExtract(self, dl_path, size, gid): self.newDir = "" return dl_path - async def proceedCompress(self, dl_path, size, gid): + async def proceedCompress(self, dl_path, gid): pswd = self.compress if isinstance(self.compress, str) else "" - if self.seed and self.isLeech: + if self.seed and self.isLeech and not self.newDir: self.newDir = f"{self.dir}10000" up_path = f"{self.newDir}/{self.name}.zip" else: up_path = f"{dl_path}.zip" async with task_dict_lock: - task_dict[self.mid] = ZipStatus(self, size, gid) + task_dict[self.mid] = ZipStatus(self, gid) if self.equalSplits: size = await get_path_size(dl_path) parts = -(-size // self.splitSize) @@ -560,7 +563,7 @@ async def proceedCompress(self, dl_path, size, gid): LOGGER.error(f"{stderr}. Unable to zip this path: {dl_path}") return dl_path - async def proceedSplit(self, up_dir, m_size, o_files, size, gid): + async def proceedSplit(self, up_dir, m_size, o_files, gid): checked = False for dirpath, _, files in await sync_to_async(walk, up_dir, topdown=False): for file_ in files: @@ -570,7 +573,7 @@ async def proceedSplit(self, up_dir, m_size, o_files, size, gid): if not checked: checked = True async with task_dict_lock: - task_dict[self.mid] = SplitStatus(self, size, gid) + task_dict[self.mid] = SplitStatus(self, gid) LOGGER.info(f"Splitting: {self.name}") res = await split_file( f_path, f_size, dirpath, self.splitSize, self @@ -593,7 +596,7 @@ async def proceedSplit(self, up_dir, m_size, o_files, size, gid): m_size.append(f_size) o_files.append(file_) - async def generateSampleVideo(self, dl_path, size, gid): + async def generateSampleVideo(self, dl_path, gid, unwanted_files): data = self.sampleVideo.split(":") if isinstance(self.sampleVideo, str) else "" if data: sample_duration = int(data[0]) if data[0] else 60 @@ -603,7 +606,7 @@ async def generateSampleVideo(self, dl_path, size, gid): part_duration = 4 async with task_dict_lock: - task_dict[self.mid] = SampleVideoStatus(self, size, gid) + task_dict[self.mid] = SampleVideoStatus(self, gid) async with cpu_eater_lock: checked = False @@ -612,15 +615,30 @@ async def generateSampleVideo(self, dl_path, size, gid): if not checked: checked = True LOGGER.info(f"Creating Sample video: {self.name}") - return await createSampleVideo( + res = await createSampleVideo( self, dl_path, sample_duration, part_duration, True ) + if res: + newfolder = ospath.splitext(dl_path)[0] + name = dl_path.rsplit("/", 1)[1] + if self.seed: + self.newDir = f"{self.dir}10000" + newfolder = newfolder.replace(self.dir, self.newDir) + await makedirs(newfolder, exist_ok=True) + if self.seed: + await copy2(dl_path, f"{newfolder}/{name}") + else: + await move(dl_path, f"{newfolder}/{name}") + await move(res, f"{newfolder}/SAMPLE.{name}") + return self.newDir else: for dirpath, _, files in await sync_to_async( walk, dl_path, topdown=False ): for file_ in files: f_path = ospath.join(dirpath, file_) + if f_path in unwanted_files: + continue if (await get_document_type(f_path))[0]: if not checked: checked = True @@ -632,9 +650,9 @@ async def generateSampleVideo(self, dl_path, size, gid): return res return dl_path - async def convertMedia(self, up_dir, size, gid): + async def convertMedia(self, dl_path, gid, o_files, m_size): async with task_dict_lock: - task_dict[self.mid] = MediaConvertStatus(self, size, gid) + task_dict[self.mid] = MediaConvertStatus(self, gid) fvext = [] if self.convertVideo: @@ -673,46 +691,79 @@ async def convertMedia(self, up_dir, size, gid): astatus = "" checked = False - for dirpath, _, files in await sync_to_async(walk, up_dir, topdown=False): - for file_ in files: - if self.cancelled: - return False - f_path = ospath.join(dirpath, file_) - is_video, is_audio, _ = await get_document_type(f_path) - if ( - is_video - and vext - and not f_path.endswith(f".{vext}") - and ( - vstatus == "+" - and f_path.endswith(tuple(fvext)) - or vstatus == "-" - and not f_path.endswith(tuple(fvext)) - or not vstatus - ) - ): - if not checked: - checked = True - LOGGER.info(f"Converting: {self.name}") - await convert_video(self, f_path, vext) - if self.cancelled: + + async def proceedConvert(m_path): + nonlocal checked + is_video, is_audio, _ = await get_document_type(m_path) + if ( + is_video + and vext + and not m_path.endswith(f".{vext}") + and ( + vstatus == "+" + and m_path.endswith(tuple(fvext)) + or vstatus == "-" + and not m_path.endswith(tuple(fvext)) + or not vstatus + ) + ): + if not checked: + checked = True + LOGGER.info(f"Converting: {self.name}") + res = await convert_video(self, m_path, vext) + return False if self.cancelled else res + elif ( + is_audio + and not is_video + and not m_path.endswith(f".{aext}") + and ( + astatus == "+" + and m_path.endswith(tuple(faext)) + or astatus == "-" + and not m_path.endswith(tuple(faext)) + or not astatus + ) + ): + if not checked: + checked = True + LOGGER.info(f"Converting: {self.name}") + res = await convert_audio(self, m_path, aext) + return False if self.cancelled else res + else: + return False + + if await aiopath.isfile(dl_path): + output_file = await proceedConvert(dl_path) + if output_file: + if self.seed: + self.newDir = f"{self.dir}10000" + new_output_file = output_file.replace(self.dir, self.newDir) + await makedirs(self.newDir, exist_ok=True) + await move(output_file, new_output_file) + return new_output_file + else: + try: + await remove(dl_path) + except: return False - elif ( - is_audio - and not is_video - and not f_path.endswith(f".{aext}") - and ( - astatus == "+" - and f_path.endswith(tuple(faext)) - or astatus == "-" - and not f_path.endswith(tuple(faext)) - or not astatus - ) - ): - if not checked: - checked = True - LOGGER.info(f"Converting: {self.name}") - await convert_audio(self, f_path, aext) + return output_file + return dl_path + else: + for dirpath, _, files in await sync_to_async(walk, dl_path, topdown=False): + for file_ in files: if self.cancelled: return False - return True + f_path = ospath.join(dirpath, file_) + res = await proceedConvert(f_path) + if res: + if self.seed and not self.newDir: + o_files.append(f_path) + fsize = await aiopath.getsize(f_path) + m_size.append(fsize) + else: + try: + await remove(f_path) + except: + return False + + return dl_path diff --git a/bot/helper/ext_utils/media_utils.py b/bot/helper/ext_utils/media_utils.py index 3aef316b176..14bb95e4fc4 100644 --- a/bot/helper/ext_utils/media_utils.py +++ b/bot/helper/ext_utils/media_utils.py @@ -1,6 +1,5 @@ from PIL import Image from aiofiles.os import remove, path as aiopath, makedirs -from aioshutil import move from asyncio import create_subprocess_exec, gather, wait_for from asyncio.subprocess import PIPE from os import path as ospath, cpu_count @@ -18,20 +17,23 @@ async def convert_video(listener, video_file, ext): output = f"{base_name}.{ext}" cmd = ["ffmpeg", "-i", video_file, "-c", "copy", output] if listener.cancelled: - return + return False async with subprocess_lock: listener.suproc = await create_subprocess_exec(*cmd, stderr=PIPE) _, stderr = await listener.suproc.communicate() if listener.cancelled: - return + return False code = listener.suproc.returncode if code == 0: - await remove(video_file) - elif code != -9: + return output + elif code == -9: + return False + else: stderr = stderr.decode().strip() LOGGER.error( f"{stderr}. Something went wrong while converting video, mostly file is corrupted. Path: {video_file}" ) + return False async def convert_audio(listener, audio_file, ext): @@ -39,20 +41,23 @@ async def convert_audio(listener, audio_file, ext): output = f"{base_name}.{ext}" cmd = ["ffmpeg", "-i", audio_file, output] if listener.cancelled: - return + return False async with subprocess_lock: listener.suproc = await create_subprocess_exec(*cmd, stderr=PIPE) _, stderr = await listener.suproc.communicate() if listener.cancelled: - return + return False code = listener.suproc.returncode if code == 0: - await remove(audio_file) - elif code != -9: + return output + elif code == -9: + return False + else: stderr = stderr.decode().strip() LOGGER.error( f"{stderr}. Something went wrong while converting audio, mostly file is corrupted. Path: {audio_file}" ) + return False async def createThumb(msg, _id=""): @@ -356,7 +361,9 @@ async def split_file( if listener.cancelled: return False code = listener.suproc.returncode - if code != 0: + if code == -9: + return False + elif code != 0: stderr = stderr.decode().strip() try: await remove(out_path) @@ -432,15 +439,15 @@ async def split_file( if listener.cancelled: return False code = listener.suproc.returncode - if code != 0: + if code == -9: + return False + elif code != 0: stderr = stderr.decode().strip() LOGGER.error(f"{stderr}. Split Document: {path}") return True -async def createSampleVideo( - listener, video_file, sample_duration, part_duration, oneFile=False -): +async def createSampleVideo(listener, video_file, sample_duration, part_duration): filter_complex = "" dir, name = video_file.rsplit("/", 1) output_file = f"{dir}/SAMPLE.{name}" @@ -491,19 +498,13 @@ async def createSampleVideo( return False listener.suproc = await create_subprocess_exec(*cmd, stderr=PIPE) _, stderr = await listener.suproc.communicate() + if listener.cancelled: + return False code = listener.suproc.returncode if code == -9: return False elif code == 0: - if oneFile: - newDir, _ = ospath.splitext(video_file) - await makedirs(newDir, exist_ok=True) - await gather( - move(video_file, f"{newDir}/{name}"), - move(output_file, f"{newDir}/SAMPLE.{name}"), - ) - return newDir - return True + return output_file else: stderr = stderr.decode().strip() LOGGER.error( diff --git a/bot/helper/listeners/aria2_listener.py b/bot/helper/listeners/aria2_listener.py index 2fb06686832..b5e76d3cfcd 100644 --- a/bot/helper/listeners/aria2_listener.py +++ b/bot/helper/listeners/aria2_listener.py @@ -28,6 +28,7 @@ async def _onDownloadStarted(api, gid): LOGGER.info(f"onDownloadStarted: {gid} METADATA") await sleep(1) if task := await getTaskByGid(gid): + task.listener.isTorrent = True if task.listener.select: metamsg = "Downloading Metadata, wait then you can select files. Use torrent file to avoid this wait." meta = await sendMessage(task.listener.message, metamsg) @@ -66,6 +67,7 @@ async def _onDownloadComplete(api, gid): new_gid = download.followed_by_ids[0] LOGGER.info(f"Gid changed from {gid} to {new_gid}") if task := await getTaskByGid(new_gid): + task.listener.isTorrent = True if config_dict["BASE_URL"] and task.listener.select: if not task.queued: await sync_to_async(api.client.force_pause, new_gid) @@ -74,6 +76,7 @@ async def _onDownloadComplete(api, gid): await sendMessage(task.listener.message, msg, SBUTTONS) elif download.is_torrent: if task := await getTaskByGid(gid): + task.listener.isTorrent = True if hasattr(task, "seeding") and task.seeding: LOGGER.info(f"Cancelling Seed: {download.name} onDownloadComplete") await task.listener.onUploadError( @@ -96,6 +99,7 @@ async def _onBtDownloadComplete(api, gid): return LOGGER.info(f"onBtDownloadComplete: {download.name} - Gid: {gid}") if task := await getTaskByGid(gid): + task.listener.isTorrent = True if task.listener.select: res = download.files for file_o in res: diff --git a/bot/helper/listeners/direct_listener.py b/bot/helper/listeners/direct_listener.py index 92124f0e355..3059e58550a 100644 --- a/bot/helper/listeners/direct_listener.py +++ b/bot/helper/listeners/direct_listener.py @@ -5,26 +5,25 @@ class DirectListener: - def __init__(self, total_size, path, listener, a2c_opt): + def __init__(self, path, listener, a2c_opt): + self.listener = listener self._path = path - self._listener = listener self._is_cancelled = False self._a2c_opt = a2c_opt self._proc_bytes = 0 self._failed = 0 - self.task = None - self.name = self._listener.name - self.total_size = total_size + self.download_task = None + self.name = self.listener.name @property def processed_bytes(self): - if self.task: - return self._proc_bytes + self.task.completed_length + if self.download_task: + return self._proc_bytes + self.download_task.completed_length return self._proc_bytes @property def speed(self): - return self.task.download_speed if self.task else 0 + return self.download_task.download_speed if self.download_task else 0 def download(self, contents): self.is_downloading = True @@ -38,43 +37,43 @@ def download(self, contents): filename = content["filename"] self._a2c_opt["out"] = filename try: - self.task = aria2.add_uris([content["url"]], self._a2c_opt, position=0) + self.download_task = aria2.add_uris([content["url"]], self._a2c_opt, position=0) except Exception as e: self._failed += 1 LOGGER.error(f"Unable to download {filename} due to: {e}") continue - self.task = self.task.live + self.download_task = self.download_task.live while True: if self._is_cancelled: - if self.task: - self.task.remove(True, True) + if self.download_task: + self.download_task.remove(True, True) break - self.task = self.task.live - if error_message := self.task.error_message: + self.download_task = self.download_task.live + if error_message := self.download_task.error_message: self._failed += 1 LOGGER.error( - f"Unable to download {self.task.name} due to: {error_message}" + f"Unable to download {self.download_task.name} due to: {error_message}" ) - self.task.remove(True, True) + self.download_task.remove(True, True) break - elif self.task.is_complete: - self._proc_bytes += self.task.total_length - self.task.remove(True) + elif self.download_task.is_complete: + self._proc_bytes += self.download_task.total_length + self.download_task.remove(True) break sleep(1) - self.task = None + self.download_task = None if self._is_cancelled: return if self._failed == len(contents): async_to_sync( - self._listener.onDownloadError, "All files are failed to download!" + self.listener.onDownloadError, "All files are failed to download!" ) return - async_to_sync(self._listener.onDownloadComplete) + async_to_sync(self.listener.onDownloadComplete) async def cancel_task(self): self._is_cancelled = True - LOGGER.info(f"Cancelling Download: {self._listener.name}") - await self._listener.onDownloadError("Download Cancelled by User!") - if self.task: - await sync_to_async(self.task.remove, force=True, files=True) + LOGGER.info(f"Cancelling Download: {self.listener.name}") + await self.listener.onDownloadError("Download Cancelled by User!") + if self.download_task: + await sync_to_async(self.download_task.remove, force=True, files=True) diff --git a/bot/helper/listeners/task_listener.py b/bot/helper/listeners/task_listener.py index f9ebed9a65f..a74bec2d8b4 100644 --- a/bot/helper/listeners/task_listener.py +++ b/bot/helper/listeners/task_listener.py @@ -113,6 +113,12 @@ async def onDownloadComplete(self): gid = download.gid() LOGGER.info(f"Download completed: {self.name}") + if not (self.isTorrent or self.isQbit): + self.seed = False + + unwanted_files = [] + unwanted_files_size = [] + if multi_links: await self.onUploadError("Downloaded! Waiting for other tasks...") return @@ -128,7 +134,7 @@ async def onDownloadComplete(self): return up_path = f"{self.dir}/{self.name}" - size = await get_path_size(up_path) + self.size = await get_path_size(up_path) if not config_dict["QUEUE_ALL"]: async with queue_dict_lock: if self.mid in non_queued_dl: @@ -139,41 +145,42 @@ async def onDownloadComplete(self): await join_files(up_path) if self.extract: - up_path = await self.proceedExtract(up_path, size, gid) + up_path = await self.proceedExtract(up_path, gid) if self.cancelled: return up_dir, self.name = up_path.rsplit("/", 1) - size = await get_path_size(up_dir) + self.size = await get_path_size(up_dir) if self.convertAudio or self.convertVideo: - up_dir, self.name = up_path.rsplit("/", 1) - await self.convertMedia(up_dir, size, gid) + up_path = await self.convertMedia( + up_path, gid, unwanted_files, unwanted_files_size + ) if self.cancelled: return - size = await get_path_size(up_dir) + up_dir, self.name = up_path.rsplit("/", 1) + self.size = await get_path_size(up_dir) if self.sampleVideo: - up_path = await self.generateSampleVideo(up_path, size, gid) + up_path = await self.generateSampleVideo(up_path, gid, unwanted_files) if self.cancelled: return up_dir, self.name = up_path.rsplit("/", 1) - size = await get_path_size(up_dir) + self.size = await get_path_size(up_dir) if self.compress: - up_path = await self.proceedCompress(up_path, size, gid) + up_path = await self.proceedCompress(up_path, gid) if self.cancelled: return up_dir, self.name = up_path.rsplit("/", 1) - size = await get_path_size(up_dir) + self.size = await get_path_size(up_dir) - if self.isLeech: - m_size = [] - o_files = [] - if not self.compress: - await self.proceedSplit(up_dir, m_size, o_files, size, gid) - if self.cancelled: - return + if self.isLeech and not self.compress: + await self.proceedSplit( + up_dir, unwanted_files_size, unwanted_files, gid + ) + if self.cancelled: + return if not (self.forceRun or self.forceUpload): add_to_queue, event = await check_running_tasks(self.mid, "up") @@ -181,7 +188,7 @@ async def onDownloadComplete(self): if add_to_queue: LOGGER.info(f"Added to Queue/Upload: {self.name}") async with task_dict_lock: - task_dict[self.mid] = QueueStatus(self, size, gid, "Up") + task_dict[self.mid] = QueueStatus(self, gid, "Up") await event.wait() async with task_dict_lock: if self.mid not in task_dict: @@ -191,40 +198,44 @@ async def onDownloadComplete(self): non_queued_up.add(self.mid) if self.isLeech: - size = await get_path_size(up_dir) - for s in m_size: - size -= s + self.size = await get_path_size(up_dir) + for s in unwanted_files_size: + self.size -= s LOGGER.info(f"Leech Name: {self.name}") tg = TgUploader(self, up_dir) async with task_dict_lock: - task_dict[self.mid] = TelegramStatus(self, tg, size, gid, "up") + task_dict[self.mid] = TelegramStatus(self, tg, gid, "up") await gather( update_status_message(self.message.chat.id), - tg.upload(o_files, m_size, size), + tg.upload(unwanted_files), ) elif is_gdrive_id(self.upDest): - size = await get_path_size(up_path) + self.size = await get_path_size(up_path) + for s in unwanted_files_size: + self.size -= s LOGGER.info(f"Gdrive Upload Name: {self.name}") drive = gdUpload(self, up_path) async with task_dict_lock: - task_dict[self.mid] = GdriveStatus(self, drive, size, gid, "up") + task_dict[self.mid] = GdriveStatus(self, drive, gid, "up") await gather( update_status_message(self.message.chat.id), - sync_to_async(drive.upload, size), + sync_to_async(drive.upload, unwanted_files), ) else: - size = await get_path_size(up_path) + self.size = await get_path_size(up_path) + for s in unwanted_files_size: + self.size -= s LOGGER.info(f"Rclone Upload Name: {self.name}") RCTransfer = RcloneTransferHelper(self) async with task_dict_lock: task_dict[self.mid] = RcloneStatus(self, RCTransfer, gid, "up") await gather( update_status_message(self.message.chat.id), - RCTransfer.upload(up_path, size), + RCTransfer.upload(up_path, unwanted_files), ) async def onUploadComplete( - self, link, size, files, folders, mime_type, rclonePath="", dir_id="" + self, link, files, folders, mime_type, rclonePath="", dir_id="" ): if ( self.isSuperChat @@ -232,7 +243,7 @@ async def onUploadComplete( and DATABASE_URL ): await DbManager().rm_complete_task(self.message.link) - msg = f"Name: {escape(self.name)}\n\nSize: {get_readable_file_size(size)}" + msg = f"Name: {escape(self.name)}\n\nSize: {get_readable_file_size(self.size)}" LOGGER.info(f"Task Done: {self.name}") if self.isLeech: msg += f"\nTotal Files: {folders}" diff --git a/bot/helper/mirror_utils/download_utils/direct_downloader.py b/bot/helper/mirror_utils/download_utils/direct_downloader.py index 9f099cc59db..4142ce336d7 100644 --- a/bot/helper/mirror_utils/download_utils/direct_downloader.py +++ b/bot/helper/mirror_utils/download_utils/direct_downloader.py @@ -22,7 +22,7 @@ async def add_direct_download(listener, path): if not (contents := details.get("contents")): await listener.onDownloadError("There is nothing to download!") return - size = details["total_size"] + listener.size = details["total_size"] if not listener.name: listener.name = details["title"] @@ -39,7 +39,7 @@ async def add_direct_download(listener, path): if add_to_queue: LOGGER.info(f"Added to Queue/Download: {listener.name}") async with task_dict_lock: - task_dict[listener.mid] = QueueStatus(listener, size, gid, "dl") + task_dict[listener.mid] = QueueStatus(listener, gid, "dl") await listener.onDownloadStart() if listener.multi <= 1: await sendStatusMessage(listener.message) @@ -56,7 +56,7 @@ async def add_direct_download(listener, path): a2c_opt["header"] = header a2c_opt["follow-torrent"] = "false" a2c_opt["follow-metalink"] = "false" - directListener = DirectListener(size, path, listener, a2c_opt) + directListener = DirectListener(path, listener, a2c_opt) async with task_dict_lock: task_dict[listener.mid] = DirectStatus(listener, directListener, gid) diff --git a/bot/helper/mirror_utils/download_utils/gd_download.py b/bot/helper/mirror_utils/download_utils/gd_download.py index 37b23a8e68f..91f89fcc3de 100644 --- a/bot/helper/mirror_utils/download_utils/gd_download.py +++ b/bot/helper/mirror_utils/download_utils/gd_download.py @@ -12,7 +12,7 @@ async def add_gd_download(listener, path): drive = gdCount() - name, mime_type, size, _, _ = await sync_to_async( + name, mime_type, listener.size, _, _ = await sync_to_async( drive.count, listener.link, listener.userId ) if mime_type is None: @@ -32,7 +32,7 @@ async def add_gd_download(listener, path): if add_to_queue: LOGGER.info(f"Added to Queue/Download: {listener.name}") async with task_dict_lock: - task_dict[listener.mid] = QueueStatus(listener, size, gid, "dl") + task_dict[listener.mid] = QueueStatus(listener, gid, "dl") await listener.onDownloadStart() if listener.multi <= 1: await sendStatusMessage(listener.message) @@ -45,7 +45,7 @@ async def add_gd_download(listener, path): drive = gdDownload(listener, path) async with task_dict_lock: - task_dict[listener.mid] = GdriveStatus(listener, drive, size, gid, "dl") + task_dict[listener.mid] = GdriveStatus(listener, drive, gid, "dl") async with queue_dict_lock: non_queued_dl.add(listener.mid) diff --git a/bot/helper/mirror_utils/download_utils/jd_download.py b/bot/helper/mirror_utils/download_utils/jd_download.py index 21772fa43cb..622b09d0174 100644 --- a/bot/helper/mirror_utils/download_utils/jd_download.py +++ b/bot/helper/mirror_utils/download_utils/jd_download.py @@ -127,7 +127,7 @@ async def add_jd_download(listener, path): start_time = time() online_packages = [] - size = 0 + listener.size = 0 corrupted_packages = [] gid = 0 remove_unknown = False @@ -179,7 +179,7 @@ async def add_jd_download(listener, path): ): remove_unknown = True - size += pack.get("bytesTotal", 0) + listener.size += pack.get("bytesTotal", 0) online_packages.append(pack["uuid"]) if save_to.startswith("/root/Downloads/"): await retry_function( @@ -252,7 +252,7 @@ async def add_jd_download(listener, path): if add_to_queue: LOGGER.info(f"Added to Queue/Download: {listener.name}") async with task_dict_lock: - task_dict[listener.mid] = QueueStatus(listener, size, f"{gid}", "dl") + task_dict[listener.mid] = QueueStatus(listener, f"{gid}", "dl") await listener.onDownloadStart() if listener.multi <= 1: await sendStatusMessage(listener.message) diff --git a/bot/helper/mirror_utils/download_utils/rclone_download.py b/bot/helper/mirror_utils/download_utils/rclone_download.py index da5e82bdde7..ff37becdd5f 100644 --- a/bot/helper/mirror_utils/download_utils/rclone_download.py +++ b/bot/helper/mirror_utils/download_utils/rclone_download.py @@ -62,7 +62,7 @@ async def add_rclone_download(listener, path): path += listener.name else: listener.name = listener.link.rsplit("/", 1)[-1] - size = rsize["bytes"] + listener.size = rsize["bytes"] gid = token_urlsafe(12) msg, button = await stop_duplicate_check(listener) @@ -75,7 +75,7 @@ async def add_rclone_download(listener, path): if add_to_queue: LOGGER.info(f"Added to Queue/Download: {listener.name}") async with task_dict_lock: - task_dict[listener.mid] = QueueStatus(listener, size, gid, "dl") + task_dict[listener.mid] = QueueStatus(listener, gid, "dl") await listener.onDownloadStart() if listener.multi <= 1: await sendStatusMessage(listener.message) diff --git a/bot/helper/mirror_utils/download_utils/telegram_download.py b/bot/helper/mirror_utils/download_utils/telegram_download.py index 3eb427fe30a..e86dd84cb91 100644 --- a/bot/helper/mirror_utils/download_utils/telegram_download.py +++ b/bot/helper/mirror_utils/download_utils/telegram_download.py @@ -35,13 +35,13 @@ def speed(self): def processed_bytes(self): return self._processed_bytes - async def _onDownloadStart(self, size, file_id, from_queue): + async def _onDownloadStart(self, file_id, from_queue): async with global_lock: GLOBAL_GID.add(file_id) self._id = file_id async with task_dict_lock: task_dict[self._listener.mid] = TelegramStatus( - self._listener, self, size, file_id[:12], "dl" + self._listener, self, file_id[:12], "dl" ) async with queue_dict_lock: non_queued_dl.add(self._listener.mid) @@ -126,7 +126,7 @@ async def add_download(self, message, path): ) else: path = path + self._listener.name - size = media.file_size + self._listener.size = media.file_size gid = media.file_unique_id msg, button = await stop_duplicate_check(self._listener) @@ -140,7 +140,7 @@ async def add_download(self, message, path): LOGGER.info(f"Added to Queue/Download: {self._listener.name}") async with task_dict_lock: task_dict[self._listener.mid] = QueueStatus( - self._listener, size, gid, "dl" + self._listener, gid, "dl" ) await self._listener.onDownloadStart() if self._listener.multi <= 1: @@ -151,7 +151,7 @@ async def add_download(self, message, path): return else: add_to_queue = False - await self._onDownloadStart(size, gid, add_to_queue) + await self._onDownloadStart(gid, add_to_queue) await self._download(message, path) else: await self._onDownloadError("File already being downloaded!") diff --git a/bot/helper/mirror_utils/download_utils/yt_dlp_download.py b/bot/helper/mirror_utils/download_utils/yt_dlp_download.py index edfda9f948f..afe53bc20da 100644 --- a/bot/helper/mirror_utils/download_utils/yt_dlp_download.py +++ b/bot/helper/mirror_utils/download_utils/yt_dlp_download.py @@ -43,7 +43,6 @@ def error(msg): class YoutubeDLHelper: def __init__(self, listener): self._last_downloaded = 0 - self._size = 0 self._progress = 0 self._downloaded_bytes = 0 self._download_speed = 0 @@ -84,7 +83,7 @@ def downloaded_bytes(self): @property def size(self): - return self._size + return self._listener.size @property def progress(self): @@ -110,13 +109,13 @@ def _onDownloadProgress(self, d): self._downloaded_bytes += chunk_size else: if d.get("total_bytes"): - self._size = d["total_bytes"] + self._listener.size = d["total_bytes"] elif d.get("total_bytes_estimate"): - self._size = d["total_bytes_estimate"] + self._listener.size = d["total_bytes_estimate"] self._downloaded_bytes = d["downloaded_bytes"] self._eta = d.get("eta", "-") or "-" try: - self._progress = (self._downloaded_bytes / self._size) * 100 + self._progress = (self._downloaded_bytes / self._listener.size) * 100 except: pass @@ -149,9 +148,9 @@ def extractMetaData(self): if not entry: continue elif "filesize_approx" in entry: - self._size += entry["filesize_approx"] + self._listener.size += entry["filesize_approx"] elif "filesize" in entry: - self._size += entry["filesize"] + self._listener.size += entry["filesize"] if not self._listener.name: outtmpl_ = "%(series,playlist_title,channel)s%(season_number& |)s%(season_number&S|)s%(season_number|)02d.%(ext)s" self._listener.name, ext = ospath.splitext( @@ -318,7 +317,7 @@ async def add_download(self, path, qual, playlist, options): LOGGER.info(f"Added to Queue/Download: {self._listener.name}") async with task_dict_lock: task_dict[self._listener.mid] = QueueStatus( - self._listener, self._size, self._gid, "dl" + self._listener, self._gid, "dl" ) await event.wait() async with task_dict_lock: diff --git a/bot/helper/mirror_utils/gdrive_utils/clone.py b/bot/helper/mirror_utils/gdrive_utils/clone.py index cf856b76bad..2442cdcebd1 100644 --- a/bot/helper/mirror_utils/gdrive_utils/clone.py +++ b/bot/helper/mirror_utils/gdrive_utils/clone.py @@ -50,7 +50,6 @@ def clone(self): None, None, None, - None, ) self.service = self.authorize() msg = "" @@ -67,19 +66,18 @@ def clone(self): self.service.files().delete( fileId=dir_id, supportsAllDrives=True ).execute() - return None, None, None, None, None, None + return None, None, None, None, None mime_type = "Folder" - size = self.proc_bytes + self.listener.size = self.proc_bytes else: file = self._copyFile(meta.get("id"), self.listener.upDest) msg += f'Name: {file.get("name")}' durl = self.G_DRIVE_BASE_DOWNLOAD_URL.format(file.get("id")) if mime_type is None: mime_type = "File" - size = int(meta.get("size", 0)) + self.listener.size = int(meta.get("size", 0)) return ( durl, - size, mime_type, self.total_files, self.total_folders, diff --git a/bot/helper/mirror_utils/gdrive_utils/upload.py b/bot/helper/mirror_utils/gdrive_utils/upload.py index 157fff336bb..9539b34fdfa 100644 --- a/bot/helper/mirror_utils/gdrive_utils/upload.py +++ b/bot/helper/mirror_utils/gdrive_utils/upload.py @@ -39,7 +39,7 @@ def user_setting(self): self.listener.upDest = self.listener.upDest.replace("sa:", "", 1) self.use_sa = True - def upload(self, size): + def upload(self, unwanted_files): self.user_setting() self.service = self.authorize() LOGGER.info(f"Uploading: {self._path}") @@ -69,7 +69,7 @@ def upload(self, size): ospath.basename(ospath.abspath(self.listener.name)), self.listener.upDest, ) - result = self._upload_dir(self._path, dir_id) + result = self._upload_dir(self._path, dir_id, unwanted_files) if result is None: raise Exception("Upload has been manually cancelled!") link = self.G_DRIVE_DIR_BASE_DOWNLOAD_URL.format(dir_id) @@ -97,14 +97,13 @@ def upload(self, size): async_to_sync( self.listener.onUploadComplete, link, - size, self.total_files, self.total_folders, mime_type, dir_id=self.getIdFromUrl(link), ) - def _upload_dir(self, input_directory, dest_id): + def _upload_dir(self, input_directory, dest_id, unwanted_files): list_dirs = listdir(input_directory) if len(list_dirs) == 0: return dest_id @@ -113,9 +112,13 @@ def _upload_dir(self, input_directory, dest_id): current_file_name = ospath.join(input_directory, item) if ospath.isdir(current_file_name): current_dir_id = self.create_directory(item, dest_id) - new_id = self._upload_dir(current_file_name, current_dir_id) + new_id = self._upload_dir( + current_file_name, current_dir_id, unwanted_files + ) self.total_folders += 1 - elif not item.lower().endswith(tuple(self.listener.extensionFilter)): + elif current_file_name not in unwanted_files and not item.lower().endswith( + tuple(self.listener.extensionFilter) + ): mime_type = get_mime_type(current_file_name) file_name = current_file_name.split("/")[-1] self._upload_file(current_file_name, file_name, mime_type, dest_id) diff --git a/bot/helper/mirror_utils/rclone_utils/transfer.py b/bot/helper/mirror_utils/rclone_utils/transfer.py index 30e613f9250..b1b3a7eec60 100644 --- a/bot/helper/mirror_utils/rclone_utils/transfer.py +++ b/bot/helper/mirror_utils/rclone_utils/transfer.py @@ -264,7 +264,7 @@ async def _start_upload(self, cmd, remote_type): else: return True - async def upload(self, path, size): + async def upload(self, path, unwanted_files): self._is_upload = True rc_path = self._listener.upDest.strip("/") if rc_path.startswith("mrcc:"): @@ -317,7 +317,7 @@ async def upload(self, path, size): method = "move" if not self._listener.seed or self._listener.newDir else "copy" cmd = self._getUpdatedCommand( - fconfig_path, path, f"{fremote}:{rc_path}", method + fconfig_path, path, f"{fremote}:{rc_path}", method, unwanted_files ) if ( remote_type == "drive" @@ -354,7 +354,7 @@ async def upload(self, path, size): return LOGGER.info(f"Upload Done. Path: {destination}") await self._listener.onUploadComplete( - link, size, files, folders, mime_type, destination + link, files, folders, mime_type, destination ) async def clone(self, config_path, src_remote, src_path, mime_type): @@ -423,7 +423,9 @@ async def clone(self, config_path, src_remote, src_path, mime_type): ) return None, destination - def _getUpdatedCommand(self, config_path, source, destination, method): + def _getUpdatedCommand( + self, config_path, source, destination, method, unwanted_files=[] + ): ext = "*.{" + ",".join(self._listener.extensionFilter) + "}" cmd = [ "rclone", @@ -455,6 +457,9 @@ def _getUpdatedCommand(self, config_path, source, destination, method): cmd.extend((key, value)) elif len(flag) > 0: cmd.append(flag.strip()) + if unwanted_files: + for f in unwanted_files: + cmd.extend(("--exclude", f)) return cmd @staticmethod diff --git a/bot/helper/mirror_utils/status_utils/direct_status.py b/bot/helper/mirror_utils/status_utils/direct_status.py index 133c3d1036a..3223a73528f 100644 --- a/bot/helper/mirror_utils/status_utils/direct_status.py +++ b/bot/helper/mirror_utils/status_utils/direct_status.py @@ -16,7 +16,7 @@ def gid(self): def progress_raw(self): try: - return self._obj.processed_bytes / self._obj.total_size * 100 + return self._obj.processed_bytes / self.listener.size * 100 except: return 0 @@ -30,12 +30,12 @@ def name(self): return self.listener.name def size(self): - return get_readable_file_size(self._obj.total_size) + return get_readable_file_size(self.listener.size) def eta(self): try: seconds = ( - self._obj.total_size - self._obj.processed_bytes + self.listener.size - self._obj.processed_bytes ) / self._obj.speed return get_readable_time(seconds) except: diff --git a/bot/helper/mirror_utils/status_utils/extract_status.py b/bot/helper/mirror_utils/status_utils/extract_status.py index 77cb7c5ebf0..8c78385191d 100644 --- a/bot/helper/mirror_utils/status_utils/extract_status.py +++ b/bot/helper/mirror_utils/status_utils/extract_status.py @@ -11,11 +11,11 @@ class ExtractStatus: - def __init__(self, listener, size, gid): - self._size = size + def __init__(self, listener, gid): + self.listener = listener + self._size = self.listener.size self._gid = gid self._start_time = time() - self.listener = listener def gid(self): return self._gid diff --git a/bot/helper/mirror_utils/status_utils/gdrive_status.py b/bot/helper/mirror_utils/status_utils/gdrive_status.py index c75cc447b97..1c93a42ce97 100644 --- a/bot/helper/mirror_utils/status_utils/gdrive_status.py +++ b/bot/helper/mirror_utils/status_utils/gdrive_status.py @@ -6,12 +6,12 @@ class GdriveStatus: - def __init__(self, listener, obj, size, gid, status): + def __init__(self, listener, obj, gid, status): + self.listener = listener self._obj = obj - self._size = size + self._size = self.listener.size self._gid = gid self._status = status - self.listener = listener def processed_bytes(self): return get_readable_file_size(self._obj.processed_bytes) diff --git a/bot/helper/mirror_utils/status_utils/media_convert_status.py b/bot/helper/mirror_utils/status_utils/media_convert_status.py index 41fb717324c..3c06082a3ae 100644 --- a/bot/helper/mirror_utils/status_utils/media_convert_status.py +++ b/bot/helper/mirror_utils/status_utils/media_convert_status.py @@ -3,10 +3,10 @@ class MediaConvertStatus: - def __init__(self, listener, size, gid): - self._gid = gid - self._size = size + def __init__(self, listener, gid): self.listener = listener + self._gid = gid + self._size = self.listener.size def gid(self): return self._gid diff --git a/bot/helper/mirror_utils/status_utils/queue_status.py b/bot/helper/mirror_utils/status_utils/queue_status.py index 27976b0d3b7..c73ebfd0a5e 100644 --- a/bot/helper/mirror_utils/status_utils/queue_status.py +++ b/bot/helper/mirror_utils/status_utils/queue_status.py @@ -3,11 +3,11 @@ class QueueStatus: - def __init__(self, listener, size, gid, status): - self._size = size + def __init__(self, listener, gid, status): + self.listener = listener + self._size = self.listener.size self._gid = gid self._status = status - self.listener = listener def gid(self): return self._gid diff --git a/bot/helper/mirror_utils/status_utils/sample_video_status.py b/bot/helper/mirror_utils/status_utils/sample_video_status.py index 98ec5a906c8..748f637d3dc 100644 --- a/bot/helper/mirror_utils/status_utils/sample_video_status.py +++ b/bot/helper/mirror_utils/status_utils/sample_video_status.py @@ -3,10 +3,10 @@ class SampleVideoStatus: - def __init__(self, listener, size, gid): - self._gid = gid - self._size = size + def __init__(self, listener, gid): self.listener = listener + self._gid = gid + self._size = self.listener.size def gid(self): return self._gid diff --git a/bot/helper/mirror_utils/status_utils/split_status.py b/bot/helper/mirror_utils/status_utils/split_status.py index ea7b62fd04c..8bca6804660 100644 --- a/bot/helper/mirror_utils/status_utils/split_status.py +++ b/bot/helper/mirror_utils/status_utils/split_status.py @@ -3,10 +3,10 @@ class SplitStatus: - def __init__(self, listener, size, gid): - self._gid = gid - self._size = size + def __init__(self, listener, gid): self.listener = listener + self._gid = gid + self._size = self.listener.size def gid(self): return self._gid diff --git a/bot/helper/mirror_utils/status_utils/telegram_status.py b/bot/helper/mirror_utils/status_utils/telegram_status.py index 54840244c42..cdc23859bc1 100644 --- a/bot/helper/mirror_utils/status_utils/telegram_status.py +++ b/bot/helper/mirror_utils/status_utils/telegram_status.py @@ -6,12 +6,12 @@ class TelegramStatus: - def __init__(self, listener, obj, size, gid, status): + def __init__(self, listener, obj, gid, status): + self.listener = listener self._obj = obj - self._size = size + self._size = self.listener.size self._gid = gid self._status = status - self.listener = listener def processed_bytes(self): return get_readable_file_size(self._obj.processed_bytes) diff --git a/bot/helper/mirror_utils/status_utils/zip_status.py b/bot/helper/mirror_utils/status_utils/zip_status.py index eeaeb6779c0..5ba9fc20ce3 100644 --- a/bot/helper/mirror_utils/status_utils/zip_status.py +++ b/bot/helper/mirror_utils/status_utils/zip_status.py @@ -11,11 +11,11 @@ class ZipStatus: - def __init__(self, listener, size, gid): - self._size = size + def __init__(self, listener, gid): + self.listener = listener + self._size = self.listener.size self._gid = gid self._start_time = time() - self.listener = listener def gid(self): return self._gid diff --git a/bot/helper/mirror_utils/telegram_uploader.py b/bot/helper/mirror_utils/telegram_uploader.py index ef462e36215..676df1f8e93 100644 --- a/bot/helper/mirror_utils/telegram_uploader.py +++ b/bot/helper/mirror_utils/telegram_uploader.py @@ -225,7 +225,7 @@ async def _send_media_group(self, subkey, key, msgs): self._msgs_dict[m.link] = m.caption self._sent_msg = msgs_list[-1] - async def upload(self, o_files, m_size, size): + async def upload(self, o_files): await self._user_settings() res = await self._msg_to_reply() if not res: @@ -241,7 +241,10 @@ async def upload(self, o_files, m_size, size): continue try: f_size = await aiopath.getsize(self._up_path) - if self._listener.seed and file_ in o_files and f_size in m_size: + if ( + self._listener.seed + and self._up_path in o_files + ): continue self._total_files += 1 if f_size == 0: @@ -323,7 +326,7 @@ async def upload(self, o_files, m_size, size): return LOGGER.info(f"Leech Completed: {self._listener.name}") await self._listener.onUploadComplete( - None, size, self._msgs_dict, self._total_files, self._corrupted + None, self._msgs_dict, self._total_files, self._corrupted ) @retry( @@ -383,23 +386,6 @@ async def _upload_file(self, cap_mono, file, force_document=False): else: width = 480 height = 320 - if not self._up_path.upper().endswith(("MP4", "MKV")): - dirpath, file_ = self._up_path.rsplit("/", 1) - if ( - self._listener.seed - and not self._listener.newDir - and not dirpath.endswith("/splited_files_mltb") - ): - dirpath = f"{dirpath}/copied_mltb" - await makedirs(dirpath, exist_ok=True) - new_path = ospath.join( - dirpath, f"{ospath.splitext(file_)[0]}.mp4" - ) - self._up_path = await copy(self._up_path, new_path) - else: - new_path = f"{ospath.splitext(self._up_path)[0]}.mp4" - await rename(self._up_path, new_path) - self._up_path = new_path if self._is_cancelled: return self._sent_msg = await self._sent_msg.reply_video( diff --git a/bot/modules/clone.py b/bot/modules/clone.py index 5607cffb5ef..8ac033eb9d5 100644 --- a/bot/modules/clone.py +++ b/bot/modules/clone.py @@ -126,7 +126,7 @@ async def _proceedToClone(self): await sendMessage(self.message, str(e)) return if is_gdrive_link(self.link) or is_gdrive_id(self.link): - self.name, mime_type, size, files, _ = await sync_to_async( + self.name, mime_type, self.size, files, _ = await sync_to_async( gdCount().count, self.link, self.userId ) if mime_type is None: @@ -147,10 +147,10 @@ async def _proceedToClone(self): msg = "" gid = token_urlsafe(12) async with task_dict_lock: - task_dict[self.mid] = GdriveStatus(self, drive, size, gid, "cl") + task_dict[self.mid] = GdriveStatus(self, drive, gid, "cl") if self.multi <= 1: await sendStatusMessage(self.message) - flink, size, mime_type, files, folders, dir_id = await sync_to_async( + flink, mime_type, files, folders, dir_id = await sync_to_async( drive.clone ) if msg: @@ -158,7 +158,7 @@ async def _proceedToClone(self): if not flink: return await self.onUploadComplete( - flink, size, files, folders, mime_type, dir_id=dir_id + flink, files, folders, mime_type, dir_id=dir_id ) LOGGER.info(f"Cloning Done: {self.name}") elif is_rclone_path(self.link): @@ -256,7 +256,7 @@ async def _proceedToClone(self): return files = None folders = None - size = 0 + self.size = 0 LOGGER.error( f"Error: While getting rclone stat. Path: {destination}. Stderr: {res1[1][:4000]}" ) @@ -264,9 +264,9 @@ async def _proceedToClone(self): files = len(res1[0].split("\n")) folders = len(res2[0].strip().split("\n")) if res2[0] else 0 rsize = loads(res3[0]) - size = rsize["bytes"] + self.size = rsize["bytes"] await self.onUploadComplete( - flink, size, files, folders, mime_type, destination + flink, files, folders, mime_type, destination ) else: await sendMessage(self.message, CLONE_HELP_MESSAGE) diff --git a/bot/modules/mirror_leech.py b/bot/modules/mirror_leech.py index aab30d0cace..afcab66adf6 100644 --- a/bot/modules/mirror_leech.py +++ b/bot/modules/mirror_leech.py @@ -309,12 +309,12 @@ async def newEvent(self): await sendMessage(self.message, f"{e}".strip()) self.removeFromSameDir() return + elif self.isQbit: + await add_qb_torrent(self, path, ratio, seed_time) elif is_rclone_path(self.link): await add_rclone_download(self, f"{path}/") elif is_gdrive_link(self.link) or is_gdrive_id(self.link): await add_gd_download(self, path) - elif self.isQbit: - await add_qb_torrent(self, path, ratio, seed_time) else: ussr = args["-au"] pssw = args["-ap"]