diff --git a/Util/Download.py b/Util/Download.py index 655068d..19e4a21 100644 --- a/Util/Download.py +++ b/Util/Download.py @@ -12,202 +12,215 @@ Change Log : 2022/08/11 22:02:49 : Init 2022/08/30 00:30:09 : Add ImageDownload() +2023/08/04 00:48:30 : Add trim_filename(),download_file(),AwemeDownload() and Delete some unuseful function ------------------------------------------------- ''' import Util +XB = Util.XBogus() +URLS = Util.Urls() -class Download(): - - def __init__(self): - self.urls = Util.Urls() - # XB - self.XB = Util.XBogus() - - def VideoDownload(self, profileData): - self.headers = profileData.headers - # 检查已经下载的作品 - self.check = Util.CheckInfo() - self.like_counts = 0 - self.new_video_list = [] - # 生成1080p分辨率的视频链接 - self.uri_url = 'https://aweme.snssdk.com/aweme/v1/play/?video_id=%s&radio=1080p&line=0' - # 视频原声 - self.music = profileData.music - # 下载模式 - self.mode = profileData.mode - # 下载路径 - self.path = profileData.path - # 名称列表 - self.author_list = profileData.author_list - # self.video_list = profileData.video_list - # 作品uri列表 - # self.uri_list = profileData.uri_list - # 作品播放地址列表 - self.url_list = profileData.url_list - # 作品id列表 - self.aweme_id = profileData.aweme_id - # 作者 - self.nickname = profileData.nickname - # 页码 - self.max_cursor = profileData.max_cursor - # 系统分隔符 - self.sprit = profileData.sprit - # self.v_info = profileData.v_info - # self.profile = Profile() - with Util.progress: - with Util.ThreadPoolExecutor(max_workers=5) as pool: - for i in range(len(self.author_list)): - # 获取单部视频接口信息 - try: - # 官方接口 - # 旧接口22/12/23失效 - # jx_url = f'https://www.iesdouyin.com/web/api/v2/aweme/iteminfo/?item_ids={self.aweme_id[i]}' - # 23/01/11 - # 此ies domian暂时不需要xg参数 - # 单作品接口 'aweme_detail' - # 主页作品 'aweme_list' - # 23/02/09 更新xg参数 - jx_url = Util.Urls().POST_DETAIL + self.XB.getXBogus( - f'aweme_id={self.aweme_id[i]}&aid=6383&cookie_enabled=true&platform=PC&downlink=10')[0] - js = Util.requests.get( - url=jx_url, headers=self.headers).text - # 防止接口多次返回空 - while js == '': - js = Util.requests.get( - url=jx_url, headers=self.headers).text - js = Util.json.loads(js) - creat_time = Util.time.strftime( - "%Y-%m-%d %H.%M.%S", Util.time.localtime(js['aweme_detail']['create_time'])) - except Exception as videoNotFound: - Util.log.warning(videoNotFound) - print('[ 🚩🚩 ]:由于官方接口cdn缓存暂没过期,id:%s的视频已经不存在!\r' % - self.author_list[i]) - Util.log.warning( - f'[ 🚩🚩 ]: {self.nickname} 的视频 {self.author_list[i]} 下载失败') - continue - - # Code From RobotJohns https://github.com/RobotJohns - # 移除文件名称 /r/n - self.author_list[i] = ''.join( - self.author_list[i].splitlines()) - if len(self.author_list[i]) > 182: - print("[ 提示 ]:", "文件名称太长 进行截取") - self.author_list[i] = self.author_list[i][0:180] - print("[ 提示 ]:", "截取后的文案:{0},长度:{1}".format( - self.author_list[i], len(self.author_list[i]))) - - # 检查视频下载情况 - file_state = self.check.test( - self.path, creat_time, self.author_list[i], ".mp4") - if file_state == True: - continue +class Download: + + def __init__(self, config): + # 配置文件 + self.config = config + # 文件检查是否存在 + self.check = Util.Check() + # 进度控制台 + self.progress_console = Util.Console(width=150) + # 进度条 + self.progress = Util.Progress( + Util.TextColumn("{task.description}[bold blue]{task.fields[filename]}", justify="left"), + Util.BarColumn(bar_width=20), + "[progress.percentage]{task.percentage:>3.1f}%", + "•", + Util.DownloadColumn(), + "•", + Util.TransferSpeedColumn(), + "•", + Util.TimeRemainingColumn(), + console=self.progress_console + ) + + self.done_event = Util.asyncio.Event() + + def handle_sigint(signum, frame): + self.done_event.set() + + Util.signal.signal(Util.signal.SIGINT, handle_sigint) + + + def trim_filename(self, filename: str, max_length: int = 50) -> str: + """ + 裁剪文件名以适应控制台显示。 + + Args: + filename (str): 完整的文件名。 + max_length (int): 显示的最大字符数。 + + Returns: + str: 裁剪后的文件名。 + """ + + if len(filename) > max_length: + prefix_suffix_len = max_length // 2 - 2 # 确保前缀和后缀长度相等,并且在中间留有四个字符用于省略号("...") + return f"{filename[:prefix_suffix_len]}...{filename[-prefix_suffix_len:]}" + else: + return filename + + async def download_file(self, task_id: Util.TaskID, url: str, path: str) -> None: + """ + 下载指定 URL 的文件并将其保存在本地路径。 + + Args: + task_id (TaskID): 下载任务的唯一标识。 + url (str): 要下载的文件的 URL。 + path (str): 文件保存的本地路径. + """ + + async with Util.aiohttp.ClientSession() as session: + try: + async with session.get(url) as response: + self.progress.update(task_id, total=int(response.headers["Content-length"])) + with open(path, "wb") as dest_file: + self.progress.start_task(task_id) + chunk_size = 32768 + while not self.done_event.is_set(): + chunk = await response.content.read(chunk_size) + if not chunk: + break + dest_file.write(chunk) + self.progress.update(task_id, advance=len(chunk)) + except Exception as e: + Util.console.print(f"[ 失败 ]:下载失败,异常:{e}") + return + + async def AwemeDownload(self, aweme_data): + """ + 此函数用于从抖音作品数据列表(aweme_data)中异步下载音乐、视频和图集。 + 它会根据aweme_type的类型来决定是下载视频还是图集,并且会根据配置文件来判断是否需要下载音乐。 + 下载的文件会保存在用户目录下,并且会创建一个与抖音作品文案对应的子目录。 + 如果文件已经存在,作品将不会重新下载。 + + 文件的命名规范: + file_name: 文件名:ctime_f + desc + 文件类型。 例如'2021-02-15 18.09.05测试.mp4',用于控制台显示。 + file_path: 绝对路径的文件名:绝对路径 + ctime_f和作品同名目录 + file_name。例如'H:\\TikTokDownload\\Download\\post\\小e同学\\2021-02-15 18.09.05测试\\2021-02-15 18.09.05测试.mp4',用于文件保存。 + 作品文件夹的命名规范: ctime_f和作品同名目录。例如 2021-02-15 18.09.05测试 + + Args: + aweme_data (list): 抖音数据列表,列表的每个元素都是一个字典,字典包含了抖音的各种信息,如aweme_type, path, desc等。 + """ + + # 遍历aweme_data中的每一个aweme字典 + for aweme in aweme_data: + # 将UNIX时间戳转换为格式化的字符串 + ctime_f = Util.time.strftime('%Y-%m-%d %H.%M.%S', Util.time.localtime((aweme['create_time']))) + # 获取文件的基础路径,这里的aweme['path']是到用户目录的绝对路径 + base_path = aweme['path'] + # 创建子目录名称 + subdir_name = f'{ctime_f}_{aweme["desc"]}' + # 根据视频描述创建子目录 + desc_path = Util.os.path.join(base_path, subdir_name) + # 确保子目录存在,如果不存在,os.makedirs会自动创建 + Util.os.makedirs(desc_path, exist_ok=True) + + # 如果配置文件设置为下载音乐 + if self.config['music'].lower() == 'yes': + try: + # 尝试获取音乐的URL + music_url = aweme['music_play_url']['url_list'][0] + # 创建音乐文件名 + music_file_name = f'{ctime_f}_{aweme["desc"]}_music' + # 创建相对路径的文件名 + music_file_path = f'{music_file_name}.mp3' + # 创建绝对路径的文件名 + music_full_path = Util.os.path.join(desc_path, music_file_path) + # 检查音乐文件是否已经存在 + music_state = Util.os.path.exists(music_full_path) + # 如果音乐文件存在,则创建一个已完成的任务 + if music_state: + task_id = self.progress.add_task(description="[ 跳过 ]:", + filename=self.trim_filename(music_file_path, 50), + total=1, completed=1) + Util.log.info(f"repeat task created with ID: {task_id}") + self.progress.update(task_id, completed=1) else: - pass - - # 尝试下载音频 - try: - if self.music == "yes": - music_url = str( - js['aweme_detail']['music']['play_url']['url_list'][0]) - music_title = str( - js['aweme_detail']['music']['author']) + '创作的视频原声' - m_url = self.path + self.sprit + creat_time + Util.re.sub( - r'[\\/:*?"<>|\r\n]+', "_", music_title) + '_' + self.author_list[i] + '.mp3' - if len(self.author_list[i]) > 20: - filename = creat_time[:10] + self.author_list[i][:20] + "..." - else: - filename = creat_time[:10] + self.author_list[i] - task_id = Util.progress.add_task( - description = "[ 原声 ]:", filename=filename, start=False) - pool.submit(Util.copy_url, task_id, - music_url, self.author_list[i], m_url) - Util.log.info(m_url) - except Exception as e: - Util.log.error(e) - print('[ ❌ ]:%s\r' % e) - print('\r[ 警告 ]:下载音频出错!\r') - Util.log.error('[ ❌ ]:下载音频出错!') - - # 尝试下载视频 - try: - # 生成1080p视频链接 - # self.new_video_list.append( - # self.uri_url % self.uri_list[i]) - # 2023/04/20 1080p不再通过拼接uri获取,url_list为1080p - self.new_video_list.append(self.url_list[i]) - try: - v_url = self.path + self.sprit + creat_time + Util.re.sub( - r'[\\/:*?"<>|\r\n] + ', "_", self.author_list[i]) + '.mp4' - if len(self.author_list[i]) > 20: - filename = creat_time[:10] + self.author_list[i][:20] + "..." - else: - filename = creat_time[:10] + self.author_list[i] - task_id = Util.progress.add_task( - description = "[ 视频 ]:", filename=filename, start=False) - pool.submit( - Util.copy_url, task_id, self.new_video_list[0], self.author_list[i], v_url) - Util.log.info(v_url) - # 清除每个旧的视频列表 - self.new_video_list = [] - except Exception as videoError: - Util.log.error(videoError) - print('[ ❌ ]:%s\r' % videoError) - Util.log.error('[ 警告 ]:下载视频出错!') - print('[ 警告 ]:下载视频出错!') - - except Exception as PageNoFull: - Util.log.error(PageNoFull) - print('[ ❌ ]:%s\r' % PageNoFull) - Util.log.error('[ 提示 ]:该页视频资源没有35个,为你跳过该页!') - print('[ 提示 ]:该页视频资源没有35个,为你跳过该页!\r') - break - - def ImageDownload(self, datas): - with Util.progress: - with Util.ThreadPoolExecutor(max_workers=5) as pool: - for i in range(len(datas)): - self.nickname = datas[i][0] - self.desc = Util.replaceT(datas[i][1]) - self.create_time = Util.time.strftime( - '%Y-%m-%d %H.%M.%S', Util.time.localtime(datas[i][2])) - self.position = datas[i][3] - self.number = datas[i][4] - self.images = datas[i][5] - self.sprit = Util.sprit - - path = "Download" + self.sprit + "pic" + self.sprit + \ - self.nickname + self.sprit + self.create_time + self.desc - # 检测下载目录是否存在 - if not Util.os.path.exists(path): - Util.os.makedirs(path) - - for i in range(self.number): - # 图片目录 - p_url = 'Download' + self.sprit + 'pic' + self.sprit + self.nickname + self.sprit + \ - self.create_time + self.desc + self.sprit + \ - self.create_time + self.desc + \ - '_' + str(i) + '.jpeg' - # 检查图片下载情况 - if Util.os.path.exists(p_url): - continue + # 如果音乐文件不存在,则创建一个新的下载任务 + task_id = self.progress.add_task(description="[ 音乐 ]:", + filename=self.trim_filename(music_file_path, 50), + start=False) + Util.log.info(f"New task created with ID: {task_id}") + Util.asyncio.create_task(self.download_file(task_id, music_url, music_full_path)) + except IndexError: + # 如果无法提取音乐URL,则跳过下载该音乐 + Util.console.print("[ 提示 ]:该原声不可用,无法下载。") + Util.log.warning(f"[ 提示 ]:该原声不可用,无法下载。{IndexError}") + pass + + # 根据aweme的类型下载视频或图集 + if aweme['aweme_type'] == 0: # 如果aweme类型为0,下载视频 + try: + # 获取视频的URL + video_url = aweme['video_url_list'][0] + # 创建视频文件名 + video_file_name = f'{ctime_f}_{aweme["desc"]}_video' + # 创建相对路径的文件名 + video_file_path = f'{video_file_name}.mp4' + # 创建绝对路径的文件名 + video_full_path = Util.os.path.join(desc_path, video_file_path) + # 检查视频文件是否已经存在 + video_state = Util.os.path.exists(video_full_path) + # 如果视频文件存在,则创建一个已完成的任务 + if video_state: + task_id = self.progress.add_task(description="[ 跳过 ]:", + filename=self.trim_filename(video_file_path, 50), + total=1, completed=1) + Util.log.info(f"repeat task created with ID: {task_id}") + self.progress.update(task_id, completed=1) + # 如果视频文件不存在,则创建一个新的下载任务 + else: + task_id = self.progress.add_task(description="[ 视频 ]:", + filename=self.trim_filename(video_file_path, 50), + start=False) + Util.log.info(f"New task created with ID: {task_id}") + Util.asyncio.create_task(self.download_file(task_id, video_url, video_full_path)) + except IndexError: + # 如果无法提取视频URL,则跳过下载该音乐 + Util.console.print("[ 提示 ]:该视频不可用,无法下载。") + Util.log.warning(f"[ 提示 ]:该视频不可用,无法下载。{IndexError}") + pass + + elif aweme['aweme_type'] == 68: # 如果aweme类型为68,下载图集 + try: + for i, image_dict in enumerate(aweme['images']): + # 提取每个图集的 url_list 的第一项 + image_url = image_dict.get('url_list', [None])[0] + # 创建图片文件名 + image_file_name = f'{ctime_f}_{aweme["desc"]}_image_{i + 1}' + # 创建相对路径的文件名 + image_file_path = f'{image_file_name}.jpg' + # 创建绝对路径的文件名 + image_full_path = Util.os.path.join(desc_path, image_file_path) + # 检查图片文件是否已经存在 + image_state = Util.os.path.exists(image_full_path) + # 如果图片文件存在,则创建一个已完成的任务 + if image_state: + task_id = self.progress.add_task(description="[ 跳过 ]:", + filename=self.trim_filename(image_file_path, 50), + total=1, completed=1) + Util.log.info(f"repeat task created with ID: {task_id}") + self.progress.update(task_id, completed=1) + # 如果图片文件不存在,则创建一个新的下载任务 else: - pass - # 尝试下载图片 - try: - if len(self.desc) > 25: - filename = self.create_time[:10] + self.desc[:25] + "..." - else: - filename = self.create_time[:10] + self.desc - task_id = Util.progress.add_task( - description = "[ 图集 ]:", filename=filename, start=False) - pool.submit(Util.copy_url, task_id, - self.images[i], self.desc, p_url) - except Exception as error: - print('[ 错误 ]:%s\r' % error) - print('[ 提示 ]:发生了点意外!\r') - - -if __name__ == '__main__': - Download() + task_id = self.progress.add_task(description="[ 图集 ]:", + filename=self.trim_filename(image_file_path, 50), + start=False) + Util.log.info(f"New task created with ID: {task_id}") + Util.asyncio.create_task(self.download_file(task_id, image_url, image_full_path)) + except IndexError: + # 如果无法提取图集URL,则跳过下载该音乐 + Util.console.print("[ 提示 ]:该图片不可用,无法下载。") + Util.log.warning(f"[ 提示 ]:该图片不可用,无法下载。{IndexError}") + pass