diff --git a/.gitignore b/.gitignore index b6e4761..2a9335b 100644 --- a/.gitignore +++ b/.gitignore @@ -127,3 +127,10 @@ dmypy.json # Pyre type checker .pyre/ + +# custom +.idea +bin/tmp +bin/keys.json +bin/*.json +bin/tasks.py diff --git a/README.md b/README.md index 37118a4..3ced1c1 100644 --- a/README.md +++ b/README.md @@ -1 +1,120 @@ -# tinypng-unlimited \ No newline at end of file +# tinypng-unlimited + +## 介绍 + +### TinyPNG + +TinyPNG使用将WebP, PNG and JPEG图片进行智能有损压缩,该压缩对视觉的影响几乎不可见,但是能显著压缩文件体积,以节省空间储存,方便网络传输。 + +通过邮箱免费申请TinyPNG官方API秘钥,可以获得每月500张图片的免费压缩次数(不限文件大小)。 + +因此,通过API进行图片批量压缩是相对理想的形式,[申请地址]([TinyPNG – Developer API](https://tinypng.com/developers)) + +### tinypng-unlimited + +> 本项目仅供技术研究使用,请勿用于任何商业及非法用途,任何后果作者概不负责! + +**本项目可自动申请API秘钥,以多线程形式批量进行TinyPNG压缩,并附带上传、下载和总体任务的进度条,旨在提供最方便快捷的云压缩功能** + +本项目实现的功能: + +1. 通过多个临时邮箱自动申请TinyPNG官方API秘钥,以实现**无限制使用TinyPNG** +2. **自动切换不可用秘钥**(即将达到500次免费压缩的秘钥) +3. 多线程上传下载图片,**加快批量压缩进度** +4. 可选**使用代理**上传、下载图片 +5. 可选**递归子文件夹**,可通过**正则匹配**需要压缩的文件名 +6. 可选**通过配置文件批量添加**图片文件名、文件夹任务列表 +7. 可选**输出压缩日志**到图片输出文件夹目录 +8. 显示上传、下载和总体任务的**进度条** +9. 为每个压缩后的图片添加压缩标记字节(不影响图片内容),**避免重复压缩** +10. 上传、下载带有**超时时间** +11. **压缩错误自动重试**,超出重试次数输出错误文件列表,下次运行时自动重新压缩 + + + +## 安装 + +方式一: + +1. 下载本项目文件 + +2. 安装依赖 + + ``` + pip install -r requirements.txt + ``` + +方式二: + +1. 下载已编译命令行工具:[TinyPNG-Unlimited.exe](https://github.com/ruchuby/TinyPNG-Unlimited/releases) + + + +### 使用 + +1. 压缩单文件 + ```bash + path\to\your\python main.py file "path\to\your\image" + TinyPNG-Unlimited.exe file "path\to\your\image" + ``` + +2. 压缩单文件夹 + ```bash + path\to\your\python main.py dir "path\to\your\image\dir" + TinyPNG-Unlimited.exe dir "path\to\your\image\dir" + ``` + +3. 使用配置文件批量压缩 + ```bash + path\to\your\python main.py tasks "path\to\tasks.json" + TinyPNG-Unlimited.exe dir "path\to\tasks.json" + ``` + tasks-emample.json: + + ```json + { + "file_tasks": ["D:\\1.jpg", "D:\\2.jpg"], + "dir_tasks": ["D:\\dir1", "D:\\dir2"] + } + ``` + + 参考 [tasks-help.txt](https://github.com/ruchuby/TinyPNG-Unlimited/blob/develop/bin/tasks-help.txt) + +4. 申请API秘钥 + + 程序运行时会自动申请秘钥,但也可以通过此方式再次申请 + ```bash + path\to\your\python main.py apply 4 + TinyPNG-Unlimited.exe apply 4 + ``` + +5. 重新排列API秘钥顺序 + + 依次请求获取本地储存的API秘钥压缩次数,重新排列秘钥顺序 + + ```bash + path\to\your\python main.py rearrange + TinyPNG-Unlimited.exe rearrange + ``` + +6. 更多细节请使用命令行帮助,或者打开项目源码查看 + + ```bash + TinyPNG-Unlimited.exe -h + TinyPNG-Unlimited.exe file -h + TinyPNG-Unlimited.exe dir -h + TinyPNG-Unlimited.exe tasks -h + TinyPNG-Unlimited.exe apply -h + TinyPNG-Unlimited.exe rearrange -h + ``` + + + + +## 截图 + +image-20221108212239001 + +image-20221108212228078 + + diff --git a/bin/main.py b/bin/main.py new file mode 100644 index 0000000..949af49 --- /dev/null +++ b/bin/main.py @@ -0,0 +1,285 @@ +import argparse +import json +import os +import sys +import time + +from loguru import logger +from tqdm import tqdm + +# 添加包路径进入环境变量 +cur_file_path = sys.argv[0] +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(cur_file_path), '..'))) +os.system('title=TinyPng无限制压缩图片') + +from tinypng_unlimited import KeyManager, TinyImg + + +def init(proxy=None): + logger.info('TinyPng正在初始化') + KeyManager.init(os.path.dirname(cur_file_path)) + + tmp_dir = os.path.join(os.path.dirname(cur_file_path), 'tmp') + if os.path.exists(tmp_dir): + os.removedirs(tmp_dir) # 清空之前的临时下载文件夹 + + if not len(KeyManager.Keys.available): + logger.error('无可用秘钥,请稍后重试') + exit() + + TinyImg.set_key(KeyManager.Keys.available[0]) + + if proxy is not None: + TinyImg.set_proxy(proxy) + + logger.success('TinyPng初始化成功') + + +def compress_error_files(file_list): + times = 0 + logger.warning('存在压缩失败图片({}):\n{}', len(file_list), file_list) + while times < 5: + times += 1 + logger.info('1s后对上述文件列表内文件进行压缩(第{}次)', times) + time.sleep(1) + res = TinyImg.compress_from_file_list(file_list) + tqdm.write('') + logger.debug('压缩报告基本信息:\n{}', json.dumps(res['basic'], ensure_ascii=False, indent=2)) + # 压缩失败文件不考虑输出日志到文件 + if res['basic']['error_count'] > 0: # 仍然存在压缩失败的文件 + file_list = res['error_files'] + logger.warning('存在压缩失败图片({}):\n{}', len(file_list), file_list) + else: + return + + file_path = os.path.abspath(os.path.join(os.path.dirname(cur_file_path), 'error_files.json')) + try: + with open(file_path, encoding='utf-8') as f: + old_error_files = json.load(f) + except Exception: + old_error_files = [] + with open(file_path, 'w', encoding='utf-8') as f: + json.dump(old_error_files + file_list, f, ensure_ascii=False, indent=2) + logger.error('超过压缩失败重试次数{}, 压缩失败图片路径已保存', times) + + +def compress_cover(input_type: str, file_list: list = None, dir_path: str = None, + proxy: str = None, log: bool = False): + if input_type == 'dir': + if not len(dir_path): + return False + elif input_type == 'file_list': + if not len(file_list): + return False + else: + raise Exception('input_type must be "dir" or "file_list"') + + if proxy: + logger.info('配置: 使用代理上传图片: {}', proxy) + if log: + logger.info('配置: 压缩完成后输出压缩日志文件') + + try: + if input_type == 'dir': + logger.info('1s后开始对文件夹内图片进行压缩: {}', dir_path) + time.sleep(1) + res = TinyImg.compress_from_dir(dir_path) + else: + logger.info('1s后开始对图片列表进行压缩: {}', file_list) + time.sleep(1) + res = TinyImg.compress_from_file_list(file_list) + tqdm.write('') + logger.debug('压缩报告基本信息:\n{}', json.dumps(res['basic'], ensure_ascii=False, indent=2)) + + # 仅文件夹才输出日志 + if input_type == 'dir' and log: + log_path = os.path.abspath(os.path.join(dir_path, 'log.json')) + with open(log_path, 'w', encoding='utf-8') as f: + json.dump(res, f, ensure_ascii=False, indent=2) + logger.success('压缩日志已输出: {}', log_path) + + if res['basic']['error_count'] > 0: # 存在压缩失败的文件 + compress_error_files(res['error_files']) + + except Exception as e: + logger.error(e) + return True + + +def compress_cover_dir(dir_path: str, proxy: str = None, log: bool = False): + """ + 压缩文件夹内图片(覆盖) + :param dir_path: 文件夹路径 + :param proxy: 代理地址 + :param log: 是否输出日志到该文件夹 + :return: bool 是否进行了压缩 + """ + return compress_cover(input_type='dir', dir_path=dir_path, proxy=proxy, log=log) + + +def compress_cover_file_list(file_list: list, proxy: str = None): + """ + 压缩文件列表内图片(覆盖) + :param file_list: 文件路径列表 + :param proxy: 代理地址 + :return: bool 是否进行了压缩 + """ + return compress_cover(input_type='file_list', file_list=file_list, proxy=proxy) + + +def character_drawing(): + tqdm.write(r''' + _______ ____ _ ________ __ __ ___ _ __ __ + /_ __(_)___ __ __/ __ \/ | / / ____/ / / / /___ / (_)___ ___ (_) /____ ____/ / + / / / / __ \/ / / / /_/ / |/ / / __ / / / / __ \/ / / __ `__ \/ / __/ _ \/ __ / + / / / / / / / /_/ / ____/ /| / /_/ / / /_/ / / / / / / / / / / / / /_/ __/ /_/ / +/_/ /_/_/ /_/\__, /_/ /_/ |_/\____/ \____/_/ /_/_/_/_/ /_/ /_/_/\__/\___/\__,_/ + /____/ + ''') + + +def check_error_files(proxy=None): + path = os.path.abspath(os.path.join(os.path.dirname(cur_file_path), 'error_files.json')) + try: + with open(path, encoding='utf-8') as f: + file_list = json.load(f) + except: + return + + if isinstance(file_list, list) and len(file_list) > 0: + logger.info('检测到压缩失败图片路径列表, 1s后开始压缩') + os.remove(path) + compress_cover_file_list(file_list, proxy) + logger.success('文件列表压缩完成') + character_drawing() + os.system('echo \7') # 输出到终端时可以发出蜂鸣作为一种提醒 + + +def command_dir(args): + if args.dir is None: + args.dir = input('输入图片文件夹路径(为空则结束程序):').strip('"') + + character_drawing() + init(proxy=args.proxy) + check_error_files(args.proxy) + + while compress_cover_dir(args.dir, args.proxy, args.log): + tqdm.write('=' * 60) + if args.recur: + # 递归子文件夹 + for root, dirs, files in os.walk(args.dir): + for dir_path in dirs: + dir_path = os.path.join(root, dir_path) + logger.info('正在递归子文件夹: {}', dir_path) + compress_cover_dir(dir_path, args.proxy, args.log) + tqdm.write('=' * 60) + character_drawing() + os.system('echo \7') # 输出到终端时可以发出蜂鸣作为一种提醒 + args.dir = input('输入下一个图片文件夹路径(为空则结束程序):').strip('"') + + +def command_file(args): + character_drawing() + init(proxy=args.proxy) + check_error_files(args.proxy) + + compress_cover_file_list([args.file], args.proxy) + logger.success('文件压缩完成') + + +def command_tasks(args): + if os.path.exists(args.path): + with open(args.path, encoding='utf-8') as f: + tasks = json.load(f) + else: + logger.error('{} does not exist', args.path) + return + + character_drawing() + init(proxy=args.proxy) + check_error_files(args.proxy) + + length = 1 if 'file_tasks' in tasks else 0 + length += len(tasks['dir_tasks']) if 'dir_tasks' in tasks else 0 + with tqdm(desc='[总体进度]', unit='任务', total=length, file=sys.stdout, ascii=' ▇', + colour='magenta', leave=False, ncols=120, position=5) as bar: + if 'file_tasks' in tasks: + compress_cover_file_list(tasks['file_tasks'], args.proxy) + logger.success('文件列表压缩完成') + bar.update() + character_drawing() + os.system('echo \7') # 输出到终端时可以发出蜂鸣作为一种提醒 + if 'dir_tasks' in tasks: + for dir_task in tasks['dir_tasks']: + compress_cover_dir(dir_task, args.proxy, args.log) + if args.recur: + # 递归子文件夹 + for root, dirs, files in os.walk(dir_task): + for dir_path in dirs: + dir_path = os.path.join(root, dir_path) + logger.info('正在递归子文件夹: {}', dir_path) + compress_cover_dir(dir_path, args.proxy, args.log) + logger.success('文件夹列表压缩完成') + tqdm.write('=' * 60) + bar.update() + character_drawing() + os.system('echo \7') # 输出到终端时可以发出蜂鸣作为一种提醒 + tqdm.write('') + + +def command_apply(args): + KeyManager.working_dir = os.path.dirname(cur_file_path) + KeyManager.load_keys() + KeyManager.apply_store_key(args.num) + + +def command_rearrange(args): + KeyManager.working_dir = os.path.dirname(cur_file_path) + KeyManager.rearrange_keys() + + +def main(): + # 命令行参数解析 + parser = argparse.ArgumentParser(description='Tinify Your Images Unlimited! ' + 'All compressed images will cover themselves.') + subparsers = parser.add_subparsers(metavar='') + # dir + dir_parser = subparsers.add_parser('dir', help='Compress images from dir or input the path later') + dir_parser.add_argument('-d', '--dir', type=str, help='The dir where your images are.') + dir_parser.set_defaults(func=command_dir) + # file + file_parser = subparsers.add_parser('file', help='Compress image from file') + file_parser.add_argument('file', type=str, help='The path where the image is.') + file_parser.add_argument('-p', '--proxy', type=str, help='The proxy used on uploading images.') + file_parser.set_defaults(func=command_file) + # tasks + tasks_parser = subparsers.add_parser('tasks', help='Compress images from tasks.json') + tasks_parser.add_argument('path', type=str, help='The path where the tasks.json is.') + tasks_parser.set_defaults(func=command_tasks) + + for p in dir_parser, tasks_parser: + p.add_argument('-p', '--proxy', type=str, help='The proxy used on uploading images.') + p.add_argument('-r', '--recur', action='store_true', help='Whether to recurse the dir.') + p.add_argument('-l', '--log', action='store_true', help='Whether to output compression log in images dir.') + + # apply + apply_parser = subparsers.add_parser('apply', help='Apply TinyPNG API key.') + apply_parser.add_argument('num', type=int, nargs='?', default=4, + help='The number of times to apply a TinyPNG API key.') + apply_parser.set_defaults(func=command_apply) + + # rearrange + apply_parser = subparsers.add_parser('rearrange', + help='Rearrange API keys in keys.json by compression count.') + apply_parser.set_defaults(func=command_rearrange) + + args = parser.parse_args() + if 'func' not in args: + parser.print_help() + return + args.func(args) + input('回车退出') + + +if __name__ == '__main__': + main() diff --git a/bin/tasks-help.txt b/bin/tasks-help.txt new file mode 100644 index 0000000..b352d73 --- /dev/null +++ b/bin/tasks-help.txt @@ -0,0 +1,27 @@ +Use json create the tasks file like "my_tasks.json". +The json data must be an object with keys: "file_tasks", "dir_tasks". +Using both of them or one of them is ok. + +Create the tasks file, and then you can use "python main.py tasks path\to\my_tasks.json" +or "TinyPNG-Unlimited.exe tasks path\to\my_tasks.json" to compress images conveniently. + +By the way, the compressed images will cover themselves. + + +======================================================= +example 1: +{ + "file_tasks": ["D:\\1.jpg", "D:\\2.jpg"], + "dir_tasks": ["D:\\dir1", "D:\\dir2"] +} +======================================================= +example 2: +{ + "file_tasks": ["D:\\1.jpg", "D:\\2.jpg"] +} +======================================================= +example 3: +{ + "dir_tasks": ["D:\\dir1", "D:\\dir2"] +} +======================================================= \ No newline at end of file diff --git a/icon.ico b/icon.ico new file mode 100644 index 0000000..6adc16f Binary files /dev/null and b/icon.ico differ diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..53313ca --- /dev/null +++ b/requirements.txt @@ -0,0 +1,10 @@ +certifi==2022.9.24 +charset-normalizer==2.1.1 +colorama==0.4.6 +idna==3.4 +loguru==0.6.0 +requests==2.28.1 +tinify==1.6.0 +tqdm==4.64.1 +urllib3==1.26.12 +win32-setctime==1.1.0 diff --git a/tinypng_unlimited/__init__.py b/tinypng_unlimited/__init__.py new file mode 100644 index 0000000..0322b2e --- /dev/null +++ b/tinypng_unlimited/__init__.py @@ -0,0 +1,12 @@ +__all__ = ['TinyImg', 'KeyManager'] + +from loguru import logger +from tqdm import tqdm + +# 避免冲突 +logger.remove() +logger.add(lambda msg: tqdm.write(msg, end=''), colorize=True, + format='{time:YYYY-MM-DD hh:mm:ss}\t| {level:9}| {message}') + +from tinypng_unlimited.tiny_img import TinyImg +from tinypng_unlimited.key_manager import KeyManager diff --git a/tinypng_unlimited/errors.py b/tinypng_unlimited/errors.py new file mode 100644 index 0000000..a47920a --- /dev/null +++ b/tinypng_unlimited/errors.py @@ -0,0 +1,48 @@ +from typing import Any + + +class CustomException(Exception): + """ + 自定义异常基类 + """ + + def __init__(self, name: str, msg: str, detail: Any = None): + super().__init__(name, msg, detail) + self.msg = msg + self.detail = detail + + +class SnapMailException(CustomException): + """ + 接受邮件相关错误 + """ + + def __init__(self, msg: str, detail: Any = None): + super().__init__('接受邮件相关错误', msg, detail) + + +class ApplyKeyException(CustomException): + """ + 申请新秘钥相关错误 + """ + + def __init__(self, msg: str, detail: Any = None): + super().__init__('申请新秘钥相关错误', msg, detail) + + +class ProxyManagerException(CustomException): + """ + 调用代理相关错误 + """ + + def __init__(self, msg: str, detail: Any = None): + super().__init__('调用代理相关错误', msg, detail) + + +class CompressException(CustomException): + """ + 压缩图片相关错误 + """ + + def __init__(self, msg: str, detail: Any = None): + super().__init__('压缩图片相关错误', msg, detail) diff --git a/tinypng_unlimited/key_manager.py b/tinypng_unlimited/key_manager.py new file mode 100644 index 0000000..83487fa --- /dev/null +++ b/tinypng_unlimited/key_manager.py @@ -0,0 +1,193 @@ +import json +import os +import re +import time + +import requests +from loguru import logger +from requests import Timeout + +from tinypng_unlimited.errors import SnapMailException, ApplyKeyException +from tinypng_unlimited.snapmail import SnapMail + + +class KeyManager: + working_dir: str + + class Keys: + available: list + unavailable: list + + @classmethod + def load(cls, obj: dict): + cls.available = obj['available'] if 'available' in obj else [] + cls.unavailable = obj['unavailable'] if 'unavailable' in obj else [] + + @classmethod + def init(cls, working_dir): + """ + 秘钥初始化,请在所有需要秘钥的操作之前执行 + """ + cls.working_dir = working_dir + cls.load_keys() + if len(cls.Keys.available) < 3: + logger.warning('当前可用秘钥少于3条,优先申请新秘钥') + cls.apply_store_key() + + @classmethod + def load_keys(cls): + """ + 加载本地存储的秘钥 + """ + path = os.path.abspath(os.path.join(cls.working_dir, 'keys.json')) + if not os.path.exists(path): + cls.Keys.load({}) + else: + with open(path, 'r', encoding='utf-8') as f: + cls.Keys.load(json.load(f)) + + @classmethod + def store_key(cls): + """ + 秘钥保存到本地 + """ + path = os.path.abspath(os.path.join(cls.working_dir, 'keys.json')) + with open(path, 'w', encoding='utf-8') as f: + json.dump({ + "available": cls.Keys.available, + "unavailable": cls.Keys.unavailable + }, f, ensure_ascii=False, indent=4, separators=(',', ':')) + + @staticmethod + def get_api_count(s, key): + url = 'https://api.tinify.com/shrink' + retry = 0 + logger.info('正在获取秘钥可用性信息...', key) + while True: + try: + res = s.post(url, auth=('api', key)) + return int(res.headers.get('compression-count')) + except Exception as e: + retry += 1 + if retry > 3: # 最多再重试3次(总共4次) + raise e + time.sleep(1) + + @classmethod + def rearrange_keys(cls): + path = os.path.abspath(os.path.join(cls.working_dir, 'keys.json')) + if not os.path.exists(path): + keys = {"available": [], "unavailable": []} + else: + with open(path, 'r', encoding='utf-8') as f: + keys = json.load(f) + out = {"available": [], "unavailable": []} + + with requests.Session() as s: + for type_name in ('available', 'unavailable'): + for index, key in enumerate(keys[type_name]): + count = cls.get_api_count(s, key) + out['available' if count < 490 else 'unavailable'].append((keys[type_name][index], count)) + + for type_name in ('available', 'unavailable'): + out[type_name].sort(key=lambda item: item[1], reverse=True) + logger.info('统计信息:', type_name) + logger.info(json.dumps(out[type_name], indent=2)) + out[type_name] = [x[0] for x in out[type_name]] + + cls.Keys.load(out) + cls.store_key() + logger.success('秘钥已按统计信息重新排列') + + @classmethod + def next_key(cls) -> str: + """ + 删除当前秘钥并返回下一条 + """ + cls.load_keys() + + if len(cls.Keys.available) < 3: + logger.warning('可用秘钥少于3条,优先申请新秘钥') + cls.apply_store_key() + + if not len(cls.Keys.available): + raise Exception('无可用秘钥,请申请后重试') + cls.Keys.unavailable.append(cls.Keys.available.pop(0)) + cls.store_key() + logger.debug('秘钥已切换,等待载入') + return cls.Keys.available[0] + + @classmethod + def _apply_api_key(cls) -> str: + """ + 申请新秘钥 + """ + with requests.Session() as session: + # 注册新账号(发送确认邮件) + mail = SnapMail.create_new_mail() + res = session.post('https://tinypng.com/web/api', json={ + "fullName": mail[:mail.find('@')], + "mail": mail + }) + + if res.status_code == 429: + raise ApplyKeyException('新账号注册过于频繁', res.text) + if res.status_code != 200 or res.text != '{}': + raise ApplyKeyException('新账号注册未知错误', res.text) + logger.info('注册邮件已发送至:{}', mail) + time.sleep(5) # 5s后开始 + + # 接收邮件,提取链接 + try: + res_json: dict = SnapMail.get_email_list(session, 1) + match = re.search(r'(https://tinify.com/login\?token=.*?api)', res_json[0]['text']) + url = match.group(1) + except SnapMailException as e: + raise ApplyKeyException('注册邮件接收失败', e) + except Exception as e: + raise ApplyKeyException('注册链接提取失败', e) + logger.info('注册链接提取成功') + + # 访问控制台,生成秘钥 + retry = 0 + while True: + try: + session.get(url) + auth = (session.get('https://tinify.com/web/session')).json()['token'] # 获取鉴权 + headers = { + 'authorization': f"Bearer {auth}" + } + session.post('https://api.tinify.com/api/keys', headers=headers) # 添加新秘钥 + res = session.get('https://api.tinify.com/api', headers=headers) # 获取秘钥 + key = res.json()['keys'][-1]['key'] + break + except Exception as e: + retry += 1 + if retry <= 3: + logger.error('新秘钥生成失败, 3s后进行第{}次重试 {}', retry, e) + time.sleep(3) + else: + raise ApplyKeyException(f'超出重试次数, 新秘钥生成失败: {url}', e) + + logger.success('新秘钥生成成功') + return key + + @classmethod + def apply_store_key(cls, times=None): + """ + 申请并保存秘钥 + """ + + # 允许申请次数(包括失败重试) + times = 4 - len(cls.Keys.available) if times is None else times + while times > 0: + try: + times -= 1 + logger.info('正在申请新秘钥,剩余次数: {}', times) + key = cls._apply_api_key() + cls.Keys.available.append(key) + cls.store_key() + except Timeout as e: + logger.error("请求超时: {} - {}({})", e.request.method, e.request.url, bytes.decode(e.request.content)) + except Exception as e: + logger.error(e) diff --git a/tinypng_unlimited/snapmail.py b/tinypng_unlimited/snapmail.py new file mode 100644 index 0000000..ef01dcf --- /dev/null +++ b/tinypng_unlimited/snapmail.py @@ -0,0 +1,56 @@ +import time +from random import sample +from loguru import logger +from requests import Session + +from tinypng_unlimited.errors import SnapMailException + + +class SnapMail: + BASE_URL = 'https://www.snapmail.cc/' + mail: str = None + + @classmethod + def create_new_mail(cls) -> str: + cls.mail = ''.join(sample('zyxwvutsrqponmlkjihgfedcba', 16)) + '@snapmail.cc' + return cls.mail + + @classmethod + def session_get(cls, session: Session, url: str, params: dict = None) -> dict: + if cls.mail is None: + cls.create_new_mail() + + retry = 0 + while True: + res = session.get(cls.BASE_URL + url.strip('/'), params=params) + if res.status_code != 200: + try: + err = res.json()['error'] + if err.find('Email was not found') > -1: + raise SnapMailException('邮箱内无任何邮件', err) + elif err.find('Please try again') > -1: + raise SnapMailException('邮箱请求过频繁', err) + # 其他错误 + logger.error(err) + except SnapMailException as e: + # 明确错误 + err = e + logger.error(err) + except Exception: + # 未知错误 + err = res.text + logger.error('未知邮箱请求错误 {}', err) + + retry += 1 + if retry <= 3: + logger.info(f'等待10s后进行第{retry}次重试') + time.sleep(10) + else: + raise SnapMailException('超过重试次数', 3) + else: + # 状态码200则返回 + return res.json() + + @classmethod + def get_email_list(cls, session: Session, count: int = None): + return cls.session_get(session, f'emailList/{cls.mail}', count if count is None else {'count': count}) diff --git a/tinypng_unlimited/tiny_img.py b/tinypng_unlimited/tiny_img.py new file mode 100644 index 0000000..3dc83ed --- /dev/null +++ b/tinypng_unlimited/tiny_img.py @@ -0,0 +1,258 @@ +import os +import re +import sys +import time +from concurrent.futures import ThreadPoolExecutor, as_completed +from shutil import move +from threading import RLock + +import tinify +from loguru import logger +from requests import Session +from tqdm import tqdm +from tqdm.utils import CallbackIOWrapper + +import tinypng_unlimited # 不使用from import防止交叉引用 +from tinypng_unlimited.errors import CompressException + + +class TinyImg: + _lock: RLock = RLock() + _session: Session = Session() + tmp_dir: str + + @classmethod + def set_key(cls, key): + """ + 设置新秘钥并进行验证 + """ + + with cls._lock: # 加锁避免多个线程尝试切换秘钥 + cls.tmp_dir = os.path.abspath(os.path.join(tinypng_unlimited.KeyManager.working_dir, 'tmp')) + logger.debug('正在载入秘钥: {}', key) + tinify.key = key + tinify.validate() + logger.success('秘钥已载入,当前秘钥可用性: [{}/500]', cls.compression_count()) + cls.check_compression_count() + + @classmethod + def set_proxy(cls, proxy): + tinify.proxy = proxy + + @classmethod + def to_file_save(cls, path, url, timeout=30): + """ + 安全的下载文件并保存到指定路径 + :param path: 路径 + :param url: 图片下载链接 + :param timeout: 下载超时 + """ + file_name = os.path.basename(path) + if not os.path.exists(cls.tmp_dir): + os.mkdir(cls.tmp_dir) + tmp_path = os.path.abspath(os.path.join(cls.tmp_dir, f'{os.path.basename(path)}_{round(time.time())}')) + res = cls._session.get(url, stream=True, timeout=timeout) + file_size = int(res.headers.get('content-length', 0)) + with tqdm(file=sys.stdout, desc=f'[下载进度]: {file_name}', colour='red', ncols=120, leave=False, + ascii=' ▇', total=file_size, unit="B", unit_scale=True, unit_divisor=1024) as bar: + with open(tmp_path, 'wb') as f: + wrapped_file = CallbackIOWrapper(bar.update, f, 'write') + for data in res.iter_content(2048): + wrapped_file.write(data) + f.write(b'tiny') + logger.info('已为图片添加压缩标记tiny: {}', file_name) + move(tmp_path, path) + + @classmethod + def compression_count(cls) -> int: + """ + api调用次数 + """ + with cls._lock: + if tinify.compression_count is None: + tinify.validate() + return tinify.compression_count + + @classmethod + def check_compression_count(cls): + """ + 检测秘钥是否限额,限额则替换为下一条 + """ + count = cls.compression_count() + # logger.debug('当前秘钥可用性: [{}/500]', count) + if count >= 490: # 即将达到限额,更换新秘钥(多线程,提前留好余量) + logger.warning('当前秘钥即将达到限额: [{}/500], 正在切换新秘钥', count) + cls.set_key(tinypng_unlimited.KeyManager.next_key()) + + @classmethod + def check_if_compressed(cls, path) -> bool: + """ + 检验图片是否被本程序标记为压缩 + """ + with open(path, 'rb') as f: + f.seek(-4, 2) + return f.read(4) == b'tiny' + + @classmethod + def upload_from_file(cls, f, timeout=60) -> str: + """ + 重写库方法添加超时参数,上传图片,返回云端压缩后图片链接 + :param timeout: 服务器响应超时时间,注意此时间在每次服务器做出任何响应时重置,所以不是整个请求和响应的时间 + :param f: 文件对象 + """ + s: Session = tinify.get_client().session + res = s.post('https://api.tinify.com/shrink', data=f, timeout=timeout) + count = res.headers.get('compression-count') + tinify.compression_count = int(count) + return res.headers.get('location') + + @classmethod + def compress_from_file(cls, path, new_path, check_compressed=True, + upload_timeout=None, download_timeout=None) -> tuple: + """ + 压缩图片文件 + :param path: 文件路径 + :param new_path: 新文件路径 + :param check_compressed: 是否检查压缩标记 + :param upload_timeout: 上传响应超时时间,默认60s + :param download_timeout: 下载响应超时时间,默认30s + :return: (旧大小,新大小,压缩到原来的百分比) + """ + old_size = os.path.getsize(path) + file_name = os.path.basename(path) + if check_compressed and cls.check_if_compressed(path): + logger.info('图片已带有压缩标记,不做压缩处理: {}', file_name) + time.sleep(0.5) # 似乎返回值太快会对多线程任务造成影响 + return file_name, old_size, old_size, '100.0%' + + retry = 0 + while True: + try: + # 加锁保证只有一个线程能进行检查(避免多线程同时检查,同时切换api) + # 但是切换秘钥会中断其他请求,因为tinify库中是共享同一个client + with cls._lock: + cls.check_compression_count() # 检验压缩次数是否足够 + old_key = tinify.key + with tqdm(file=sys.stdout, desc=f'[上传进度]: {file_name}', colour='green', ncols=120, leave=False, + ascii=' ▇', total=old_size, unit="B", unit_scale=True, unit_divisor=1024) as bar: + logger.info('正在上传图片至云端压缩[{}]: {}', cls._byte_converter(old_size), file_name) + with open(path, "rb") as f: + wrapped_file = CallbackIOWrapper(bar.update, f, "read") + url = cls.upload_from_file(wrapped_file, timeout=upload_timeout) + # 上传完成得到图片链接,并更新了api调用次数 + with cls._lock: + # 新旧秘钥切换时,使用旧秘钥上传图片的响应会覆盖新秘钥的值,所以需要刷新一下 + if tinify.key != old_key: + tinify.validate() + logger.success('云端压缩成功,正在下载: {}', file_name) + logger.info('当前秘钥可用性: [{}/500]', cls.compression_count()) + cls.to_file_save(new_path, url, timeout=download_timeout) + new_size = os.path.getsize(new_path) + return file_name, old_size, new_size, f'{round(100 * new_size / old_size, 2)}%' + except Exception as e: + retry += 1 + if retry <= 3: + logger.warning('重试压缩图片(第{}次): {}, 错误信息: {}', retry, file_name, e) + else: + raise CompressException('超出压缩重试次数', {'path': path, 'err': e}) + + @classmethod + def compress_from_file_list(cls, file_list, new_dir=None, upload_timeout=None, download_timeout=None) -> dict: + """ + 批量压缩多个文件 + :param file_list: 文件路径列表 + :param new_dir: 输出文件夹 + :param upload_timeout: 上传响应超时时间,默认60s + :param download_timeout: 下载响应超时时间,默认30s + :return: 压缩情况报告 + """ + + if new_dir and not os.path.exists(new_dir): + os.makedirs(new_dir) + + success_count = 0 + old_size = new_size = 0 # python不用担心大数运算溢出问题 + error_files, success_files = [], [] + file_num = len(file_list) + + logger.info('待压缩图片数量: {}', file_num) + + thread_num = 4 + # 4核心理论可以8线程,但是上传速度才是决速步 + with ThreadPoolExecutor(thread_num) as pool: + with tqdm(desc='[任务进度]', unit='份', total=file_num, file=sys.stdout, ascii=' ▇', + colour='yellow', leave=False, ncols=120, position=thread_num) as bar: + future_list = [] + for old_path in file_list: + file_name = os.path.basename(old_path) + # 默认下覆盖原文件 + new_path = os.path.abspath(os.path.join(new_dir, file_name)) if new_dir else old_path + future_list.append(pool.submit(cls.compress_from_file, old_path, new_path, + upload_timeout, download_timeout)) + + for future in as_completed(future_list): + try: + info = future.result() + # 压缩成功则统计信息 + old_size += info[1] + new_size += info[2] + success_count += 1 + success_files.append( + (info[0], cls._byte_converter(info[1]), cls._byte_converter(info[2]), info[3]) + ) + logger.success('图片压缩完成: {}', info[0]) + except CompressException as e: + error_files.append(e.detail['path']) + logger.error('压缩图片失败: {} {}', os.path.basename(e.detail['path']), e) + except Exception as e: + logger.error('压缩图片未知错误 {}', e) + bar.update() + bar_info = bar.format_dict + + compression = f'{round(100 * new_size / old_size, 2)}%' if old_size else '100%' + return { + 'basic': { + 'file_num': file_num, 'success_count': success_count, + 'error_count': len(error_files), + 'time': '{:.2f} s'.format(bar_info['elapsed']), 'speed': '{:.2f} 份/s'.format(bar_info['rate']), + 'output_size': cls._byte_converter(new_size), 'input_size': cls._byte_converter(old_size), + 'compression': compression, 'output_dir': '覆盖原文件' if new_dir is None else new_dir, + }, + 'error_files': error_files, + 'success_files': success_files, + } + + @classmethod + def compress_from_dir(cls, dir_path, new_dir=None, reg=r'.*\.(jpe?g|png|svga)$') -> dict: + """ + 压缩文件夹内图片 + :param dir_path: 文件夹路径 + :param new_dir: 输出路径(None则覆盖原文件) + :param reg: 文件名正则匹配 + :return: 压缩情况报告 + """ + if not os.path.exists(dir_path): + raise CompressException('源文件夹不存在', dir_path) + + # 默认覆盖原文件 + if new_dir and not os.path.exists(new_dir): + os.makedirs(new_dir) + + file_list = [os.path.abspath(os.path.join(dir_path, f)) for f in os.listdir(dir_path) if + re.match(reg, f, re.IGNORECASE)] + + if not len(file_list): + raise CompressException('文件夹内无任何匹配文件', dir_path) + + res = cls.compress_from_file_list(file_list, new_dir) + res['input_dir'] = dir_path + return res + + @staticmethod + def _byte_converter(byte_num) -> str: + if byte_num < 1024: # 比特 + return '{:.2f} B'.format(byte_num) # 字节 + elif 1024 <= byte_num < 1024 * 1024: + return '{:.2f} KB'.format(byte_num / 1024) # 千字节 + else: + return '{:.2f} MB'.format(byte_num / 1024 / 1024) # 兆字节