diff --git a/README.md b/README.md
index 4fc3e8a..9323f52 100644
--- a/README.md
+++ b/README.md
@@ -11,11 +11,13 @@
📑 功能清单
- ✅ 采集小红书图文/视频作品信息
-- ✅ 提取小红书图文/视频作品文件下载地址
+- ✅ 提取小红书图文/视频作品下载地址
- ✅ 下载小红书无水印图文/视频作品文件
- ✅ 自动跳过已下载的作品文件
- ✅ 作品文件完整性处理机制
-- ☑️ 采集作品信息储存至文件
+- ✅ 持久化储存作品信息至文件
+- ☑️ 后台监听剪贴板下载作品
+- ☑️ 支持 API 调用功能
📸 程序截图
@@ -137,6 +139,18 @@ async with XHS(path=path,
请求数据失败时,重试的最大次数,单位:秒 |
5 |
+
+record_data |
+bool |
+是否记录作品数据至文件 |
+false |
+
+
+image_format |
+str |
+图文作品文件名称后缀,例如:jpg 、png |
+webp |
+
🌐 Cookie
diff --git a/source/Downloader.py b/source/Downloader.py
index 782cca8..330fc25 100644
--- a/source/Downloader.py
+++ b/source/Downloader.py
@@ -4,7 +4,7 @@
from aiohttp import ServerTimeoutError
from rich.text import Text
-from .Html import retry
+from .Html import retry as re_download
__all__ = ['Download']
@@ -26,17 +26,18 @@ def __init__(
headers={"User-Agent": manager.headers["User-Agent"]},
timeout=ClientTimeout(connect=timeout))
self.retry = manager.retry
+ self.image_format = manager.image_format
- async def run(self, urls: list, name: str, type_: int, log, bar):
- if type_ == 0:
+ async def run(self, urls: list, name: str, type_: str, log, bar):
+ if type_ == "v":
await self.__download(urls[0], f"{name}.mp4", log, bar)
- elif type_ == 1:
+ elif type_ == "n":
for index, url in enumerate(urls, start=1):
- await self.__download(url, f"{name}_{index}.png", log, bar)
+ await self.__download(url, f"{name}_{index}.{self.image_format}", log, bar)
else:
raise ValueError
- @retry
+ @re_download
async def __download(self, url: str, name: str, log, bar):
temp = self.temp.joinpath(name)
file = self.folder.joinpath(name)
diff --git a/source/Explore.py b/source/Explore.py
index 0a25d5b..8393694 100644
--- a/source/Explore.py
+++ b/source/Explore.py
@@ -9,6 +9,7 @@ class Explore:
explore_data = compile(
r'"currentTime":\d{13},"note":(.*?)}},"serverRequestInfo"')
time_format = "%Y-%m-%d %H:%M:%S"
+ explore_type = {"video": "视频", "normal": "图文"}
def run(self, html: str) -> dict:
data = self.__get_json_data(html)
@@ -41,14 +42,11 @@ def __extract_tags(container: dict, data: dict):
tags = data.get("tagList", [])
container["作品标签"] = [i.get("name", "") for i in tags]
- @staticmethod
- def __extract_info(container: dict, data: dict):
+ def __extract_info(self, container: dict, data: dict):
container["作品ID"] = data.get("noteId")
container["作品标题"] = data.get("title")
container["作品描述"] = data.get("desc")
- container["作品类型"] = {
- "video": "视频", "normal": "图文"}.get(
- data.get("type"), "未知")
+ container["作品类型"] = self.explore_type.get(data.get("type"), "未知")
container["IP归属地"] = data.get("ipLocation")
def __extract_time(self, container: dict, data: dict):
diff --git a/source/Html.py b/source/Html.py
index 2507cdb..9925eb9 100644
--- a/source/Html.py
+++ b/source/Html.py
@@ -3,7 +3,7 @@
from aiohttp import ServerDisconnectedError
from aiohttp import ServerTimeoutError
-__all__ = ['Html']
+__all__ = ["Html", "retry"]
def retry(function):
diff --git a/source/Image.py b/source/Image.py
index fa2fb07..5577f65 100644
--- a/source/Image.py
+++ b/source/Image.py
@@ -23,7 +23,7 @@ def __format_image_data(data: list[str]) -> list[dict]:
@staticmethod
def __generate_image_link(token: str) -> str:
- return f"https://ci.xiaohongshu.com/{token}?imageView2/2/w/format/png"
+ return f"https://sns-img-bd.xhscdn.com/{token}"
def __extract_image_token(self, url: str) -> str:
return self.__generate_image_link(token.group(1)) if (
diff --git a/source/Manager.py b/source/Manager.py
index 2108759..3b27ffe 100644
--- a/source/Manager.py
+++ b/source/Manager.py
@@ -19,7 +19,10 @@ def __init__(
folder: str,
user_agent: str,
cookie: str,
- retry: int):
+ retry: int,
+ record_data: bool,
+ image_format: str,
+ ):
self.root = root
self.temp = root.joinpath("./temp")
self.folder = self.__init_root(root, path, folder)
@@ -34,6 +37,8 @@ def __init__(
"-bcc2-a859e97518bf; unread={%22ub%22:%22655eb3d60000000032033955%22%2C%22ue%22:%22656"
"e9ef2000000003801ff3d%22%2C%22uc%22:29}; cache_feeds=[]"}
self.retry = retry
+ self.record_data = record_data
+ self.image_format = image_format
def __init_root(self, root: Path, path: str, folder: str) -> Path:
if path and (r := Path(path)).is_dir():
@@ -61,9 +66,11 @@ def clean(self):
def filter_name(self, name: str) -> str:
name = self.NAME.sub("_", name)
- return sub(r"_+", "_", name)
+ return sub(r"_+", "_", name).strip("_")
def save_data(self, name: str, data: dict):
+ if not self.record_data:
+ return
with self.folder.joinpath(f"{name}.txt").open("a", encoding="utf-8") as f:
time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
content = f"{
diff --git a/source/Settings.py b/source/Settings.py
index 96c8032..b3e4964 100644
--- a/source/Settings.py
+++ b/source/Settings.py
@@ -16,6 +16,8 @@ class Settings:
"timeout": 10,
"chunk": 1024 * 1024,
"max_retry": 5,
+ "record_data": False,
+ "image_format": "webp",
}
encode = "UTF-8-SIG" if system() == "Windows" else "UTF-8"
diff --git a/source/__init__.py b/source/__init__.py
index 4fb213f..a355081 100644
--- a/source/__init__.py
+++ b/source/__init__.py
@@ -38,6 +38,10 @@ class XHS:
SHARE = compile(r"https://www\.xiaohongshu\.com/discovery/item/[a-z0-9]+")
SHORT = compile(r"https://xhslink\.com/[A-Za-z0-9]+")
__INSTANCE = None
+ TYPE = {
+ "视频": "v",
+ "图文": "n",
+ }
def __new__(cls, *args, **kwargs):
if not cls.__INSTANCE:
@@ -54,6 +58,8 @@ def __init__(
timeout=10,
chunk=1024 * 1024,
max_retry=5,
+ record_data=False,
+ image_format="webp",
**kwargs,
):
self.manager = Manager(
@@ -62,7 +68,10 @@ def __init__(
folder_name,
user_agent,
cookie,
- max_retry)
+ max_retry,
+ record_data,
+ image_format,
+ )
self.html = Html(
self.manager.headers,
proxy,
@@ -78,27 +87,21 @@ def __init__(
timeout, )
self.rich_log = self.download.rich_log
- async def __get_image(self, container: dict, html: str, download, log, bar):
- urls = self.image.get_image_link(html)
- # self.rich_log(log, urls) # 调试代码
- name = self.__naming_rules(container)
- if download:
- await self.download.run(urls, name, 1, log, bar)
- container["下载地址"] = urls
- self.manager.save_data(name, container)
+ def __extract_image(self, container: dict, html: str):
+ container["下载地址"] = self.image.get_image_link(html)
+
+ def __extract_video(self, container: dict, html: str):
+ container["下载地址"] = self.video.get_video_link(html)
- async def __get_video(self, container: dict, html: str, download, log, bar):
- url = self.video.get_video_link(html)
- # self.rich_log(log, url) # 调试代码
+ async def __download_files(self, container: dict, download: bool, log, bar):
name = self.__naming_rules(container)
- if download:
- await self.download.run(url, name, 0, log, bar)
- container["下载地址"] = url
+ if download and (u := container["下载地址"]):
+ await self.download.run(u, name, self.TYPE[container["作品类型"]], log, bar)
self.manager.save_data(name, container)
async def extract(self, url: str, download=False, log=None, bar=None) -> list[dict]:
# return # 调试代码
- urls = await self.__deal_links(url)
+ urls = await self.__extract_links(url)
if not urls:
self.rich_log(log, "提取小红书作品链接失败", "bright_red")
else:
@@ -106,7 +109,7 @@ async def extract(self, url: str, download=False, log=None, bar=None) -> list[di
# return urls # 调试代码
return [await self.__deal_extract(i, download, log, bar) for i in urls]
- async def __deal_links(self, url: str) -> list:
+ async def __extract_links(self, url: str) -> list:
urls = []
for i in url.split():
if u := self.SHORT.search(i):
@@ -130,10 +133,14 @@ async def __deal_extract(self, url: str, download: bool, log, bar):
if not data:
self.rich_log(log, f"{url} 提取数据失败", "bright_red")
return {}
- if data["作品类型"] == "视频":
- await self.__get_video(data, html, download, log, bar)
- else:
- await self.__get_image(data, html, download, log, bar)
+ match data["作品类型"]:
+ case "视频":
+ self.__extract_video(data, html)
+ case "图文":
+ self.__extract_image(data, html)
+ case _:
+ data["下载地址"] = []
+ await self.__download_files(data, download, log, bar)
self.rich_log(log, f"完成处理:{url}")
return data
@@ -145,6 +152,9 @@ async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_value, traceback):
+ await self.close()
+
+ async def close(self):
self.manager.clean()
await self.html.session.close()
await self.download.session.close()