diff --git a/.github/workflows/gitlab-sync.yml b/.github/workflows/gitlab-sync.yml index 58fd327..1379e49 100644 --- a/.github/workflows/gitlab-sync.yml +++ b/.github/workflows/gitlab-sync.yml @@ -2,8 +2,8 @@ name: sync2gitlab on: push: - branches: - - main + # branches: + # - main jobs: repo-sync: env: diff --git a/app/__init__.py b/app/__init__.py index a2e1997..65dbbf6 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -5,7 +5,6 @@ from starlette.middleware.cors import CORSMiddleware from fastapi.middleware.gzip import GZipMiddleware from contextlib import asynccontextmanager - from app.controller import controller_router from app.utils.loger import log from app.config import MCIMConfig @@ -13,6 +12,8 @@ from app.database._redis import ( init_redis_aioengine, close_aio_redis_engine, + init_sync_queue_redis_engine, + close_sync_queue_redis_engine, ) from app.utils.response_cache import Cache from app.utils.response_cache import cache @@ -26,6 +27,7 @@ @asynccontextmanager async def lifespan(app: FastAPI): app.state.aio_redis_engine = init_redis_aioengine() + init_sync_queue_redis_engine() await app.state.aio_redis_engine.flushall() app.state.aio_mongo_engine = init_mongodb_aioengine() await setup_async_mongodb(app.state.aio_mongo_engine) @@ -36,6 +38,7 @@ async def lifespan(app: FastAPI): yield await close_aio_redis_engine() + await close_sync_queue_redis_engine() APP = FastAPI( @@ -58,8 +61,8 @@ async def lifespan(app: FastAPI): # 强制同步中间件 force=True APP.add_middleware(ForceSyncMiddleware) -# Etag 中间件 -APP.add_middleware(EtagMiddleware) +# # Etag 中间件 +# APP.add_middleware(EtagMiddleware) # 统计 Trustable 请求 APP.add_middleware(CountTrustableMiddleware) diff --git a/app/config/redis.py b/app/config/redis.py index 3feafd2..90d093c 100644 --- a/app/config/redis.py +++ b/app/config/redis.py @@ -14,12 +14,11 @@ class RedisDatabaseModel(BaseModel): tasks_queue: int = 0 # dramatiq tasks info_cache: int = 1 # response_cache and static info rate_limit: int = 3 # rate limit - + sync_queue: int = 4 # sync queue class RedisdbConfigModel(BaseModel): host: str = "redis" port: int = 6379 - auth: bool = True user: Optional[str] = None password: Optional[str] = None database: RedisDatabaseModel = RedisDatabaseModel() @@ -27,7 +26,6 @@ class RedisdbConfigModel(BaseModel): class SyncRedisdbConfigModel(BaseModel): host: str = "sync_redis" port: int = 6379 - auth: bool = True user: Optional[str] = None password: Optional[str] = None diff --git a/app/controller/curseforge/v1/__init__.py b/app/controller/curseforge/v1/__init__.py index 8f72a7b..fb871ee 100644 --- a/app/controller/curseforge/v1/__init__.py +++ b/app/controller/curseforge/v1/__init__.py @@ -1,6 +1,6 @@ from fastapi import APIRouter, Request, BackgroundTasks -from typing import List, Optional, Union -from pydantic import BaseModel +from typing import List, Optional, Union, Annotated +from pydantic import BaseModel, Field from odmantic import query from enum import Enum import time @@ -14,6 +14,11 @@ sync_fingerprints, sync_categories, ) +from app.sync_queue.curseforge import ( + add_curseforge_modIds_to_queue, + add_curseforge_fileIds_to_queue, + add_curseforge_fingerprints_to_queue, +) from app.models.database.curseforge import Mod, File, Fingerprint from app.models.response.curseforge import ( FingerprintResponse, @@ -29,7 +34,7 @@ from app.utils.loger import log from app.utils.response_cache import cache -from app.database.mongodb import aio_mongo_engine +from app.database import aio_mongo_engine, aio_redis_engine mcim_config = MCIMConfig.load() @@ -113,7 +118,8 @@ async def check_search_result(request: Request, res: dict): not_found_modids = modids - set([mod.id for mod in mod_models]) if not_found_modids: - sync_mutil_mods.send(modIds=list(not_found_modids)) + # sync_mutil_mods.send(modIds=list(not_found_modids)) + await add_curseforge_modIds_to_queue(modIds=list(not_found_modids)) log.debug(f"modIds: {not_found_modids} not found, send sync task.") else: log.debug(f"All Mod: {not_found_modids} found.") @@ -185,19 +191,23 @@ async def curseforge_search( response_model=Mod, ) @cache(expire=mcim_config.expire_second.curseforge.mod) -async def curseforge_mod(modId: int, request: Request): +async def curseforge_mod( + modId: Annotated[int, Field(ge=30000, lt=9999999)], request: Request +): if request.state.force_sync: - sync_mod.send(modId=modId) + # sync_mod.send(modId=modId) + await add_curseforge_modIds_to_queue(modIds=[modId]) log.debug(f"modId: {modId} force sync.") return UncachedResponse() - # 排除小于 30000 的 modid - if not modId >= 30000: return UncachedResponse() + # # 排除小于 30000 的 modid + # if not modId >= 30000: return UncachedResponse() trustable: bool = True mod_model: Optional[Mod] = await request.app.state.aio_mongo_engine.find_one( Mod, Mod.id == modId ) if mod_model is None: - sync_mod.send(modId=modId) + # sync_mod.send(modId=modId) + await add_curseforge_modIds_to_queue(modIds=[modId]) log.debug(f"modId: {modId} not found, send sync task.") return UncachedResponse() # elif ( @@ -211,12 +221,13 @@ async def curseforge_mod(modId: int, request: Request): # trustable = False return TrustableResponse( # content=CurseforgeBaseResponse(data=mod_model).model_dump(), trustable=trustable - content=CurseforgeBaseResponse(data=mod_model), trustable=trustable + content=CurseforgeBaseResponse(data=mod_model), + trustable=trustable, ) class modIds_item(BaseModel): - modIds: List[int] + modIds: List[Annotated[int, Field(ge=30000, lt=9999999)]] filterPcOnly: Optional[bool] = True @@ -229,7 +240,8 @@ class modIds_item(BaseModel): # @cache(expire=mcim_config.expire_second.curseforge.mod) async def curseforge_mods(item: modIds_item, request: Request): if request.state.force_sync: - sync_mutil_mods.send(modIds=item.modIds) + # sync_mutil_mods.send(modIds=item.modIds) + await add_curseforge_modIds_to_queue(modIds=item.modIds) log.debug(f"modIds: {item.modIds} force sync.") # return UncachedResponse() return TrustableResponse( @@ -237,7 +249,7 @@ async def curseforge_mods(item: modIds_item, request: Request): trustable=False, ) # 排除小于 30000 的 modid - item.modIds = [modId for modId in item.modIds if modId >= 30000] + # item.modIds = [modId for modId in item.modIds if modId >= 30000] trustable: bool = True mod_models: Optional[List[Mod]] = await request.app.state.aio_mongo_engine.find( Mod, query.in_(Mod.id, item.modIds) @@ -245,7 +257,8 @@ async def curseforge_mods(item: modIds_item, request: Request): mod_model_count = len(mod_models) item_count = len(item.modIds) if not mod_models: - sync_mutil_mods.send(modIds=item.modIds) + # sync_mutil_mods.send(modIds=item.modIds) + await add_curseforge_modIds_to_queue(modIds=item.modIds) log.debug(f"modIds: {item.modIds} not found, send sync task.") # return UncachedResponse() return TrustableResponse( @@ -255,7 +268,8 @@ async def curseforge_mods(item: modIds_item, request: Request): elif mod_model_count != item_count: # 找到不存在的 modid not_match_modids = list(set(item.modIds) - set([mod.id for mod in mod_models])) - sync_mutil_mods.send(modIds=not_match_modids) + # sync_mutil_mods.send(modIds=not_match_modids) + await add_curseforge_modIds_to_queue(modIds=not_match_modids) log.debug( f"modIds: {item.modIds} {mod_model_count}/{item_count} not found, send sync task." ) @@ -263,20 +277,20 @@ async def curseforge_mods(item: modIds_item, request: Request): # content = [] # expire_modid: List[int] = [] # for model in mod_models: - # expire - # if ( - # model.sync_at.timestamp() + mcim_config.expire_second.curseforge.mod - # < time.time() - # ): - # expire_modid.append(model.id) - # log.debug( - # f'modId: {model.id} expired, send sync task, sync_at {model.sync_at.strftime("%Y-%m-%dT%H:%M:%SZ")}.' - # ) - # content.append(model.model_dump()) + # expire + # if ( + # model.sync_at.timestamp() + mcim_config.expire_second.curseforge.mod + # < time.time() + # ): + # expire_modid.append(model.id) + # log.debug( + # f'modId: {model.id} expired, send sync task, sync_at {model.sync_at.strftime("%Y-%m-%dT%H:%M:%SZ")}.' + # ) + # content.append(model.model_dump()) # if expire_modid: # trustable = False - # sync_mutil_mods.send(modIds=expire_modid) - # log.debug(f"modIds: {expire_modid} expired, send sync task.") + # sync_mutil_mods.send(modIds=expire_modid) + # log.debug(f"modIds: {expire_modid} expired, send sync task.") return TrustableResponse( # content=CurseforgeBaseResponse(data=content).model_dump(), content=CurseforgeBaseResponse(data=mod_models), @@ -359,7 +373,7 @@ def convert_modloadertype(type_id: int) -> Optional[str]: # ) async def curseforge_mod_files( request: Request, - modId: int, + modId: Annotated[int, Field(gt=30000, lt=9999999)], gameVersion: Optional[str] = None, modLoaderType: Optional[int] = None, # gameVersionTypeId: Optional[int] = None, @@ -367,11 +381,12 @@ async def curseforge_mod_files( pageSize: Optional[int] = 50, ): if request.state.force_sync: - sync_mod.send(modId=modId) + # sync_mod.send(modId=modId) + await add_curseforge_modIds_to_queue(modIds=[modId]) log.debug(f"modId: {modId} force sync.") return UncachedResponse() # 排除小于 30000 的 modid - if not modId >= 30000: return UncachedResponse() + # if not modId >= 30000: return UncachedResponse() # 定义聚合管道 match_conditions = {"modId": modId} gameVersionFilter = [] @@ -408,7 +423,8 @@ async def curseforge_mod_files( result = await files_collection.aggregate(pipeline).to_list(length=None) if not result or not result[0]["documents"]: - sync_mod.send(modId=modId) + # sync_mod.send(modId=modId) + await add_curseforge_modIds_to_queue(modIds=[modId]) log.debug(f"modId: {modId} not found, send sync task.") return UncachedResponse() @@ -436,7 +452,7 @@ async def curseforge_mod_files( class fileIds_item(BaseModel): - fileIds: List[int] + fileIds: List[Annotated[int, Field(ge=530000, lt=99999999)]] # get files @@ -448,22 +464,25 @@ class fileIds_item(BaseModel): # @cache(expire=mcim_config.expire_second.curseforge.file) async def curseforge_files(item: fileIds_item, request: Request): if request.state.force_sync: - sync_mutil_files.send(fileIds=item.fileIds) + await add_curseforge_fileIds_to_queue(fileIds=item.fileIds) + log.debug(f"fileIds: {item.fileIds} force sync.") return UncachedResponse() # 排除小于 530000 的 fileid - item.fileIds = [fileId for fileId in item.fileIds if fileId >= 530000] + # item.fileIds = [fileId for fileId in item.fileIds if fileId >= 530000] trustable = True file_models: Optional[List[File]] = await request.app.state.aio_mongo_engine.find( File, query.in_(File.id, item.fileIds) ) if not file_models: - sync_mutil_files.send(fileIds=item.fileIds) + await add_curseforge_fileIds_to_queue(fileIds=item.fileIds) return UncachedResponse() elif len(file_models) != len(item.fileIds): # 找到不存在的 fileid - not_match_fileids = list(set(item.fileIds) - set([file.id for file in file_models])) - sync_mutil_files.send(fileIds=not_match_fileids) + not_match_fileids = list( + set(item.fileIds) - set([file.id for file in file_models]) + ) + await add_curseforge_fileIds_to_queue(fileIds=not_match_fileids) trustable = False # content = [] # expire_fileid: List[int] = [] @@ -474,15 +493,15 @@ async def curseforge_files(item: fileIds_item, request: Request): # < time.time() # ): # trustable = False - - # expire_fileid.append(model.id) - # log.debug( - # f'fileId: {model.id} expired, send sync task, sync_at {model.sync_at.strftime("%Y-%m-%dT%H:%M:%SZ")}.' - # ) - # content.append(model.model_dump()) + + # expire_fileid.append(model.id) + # log.debug( + # f'fileId: {model.id} expired, send sync task, sync_at {model.sync_at.strftime("%Y-%m-%dT%H:%M:%SZ")}.' + # ) + # content.append(model.model_dump()) # if expire_fileid: - # sync_mutil_files.send(fileIds=expire_fileid) - # trustable = False + # await add_curseforge_fileids_to_queue(fileIds=expire_fileid) + # trustable = False return TrustableResponse( content=CurseforgeBaseResponse(data=file_models), trustable=trustable, @@ -495,19 +514,25 @@ async def curseforge_files(item: fileIds_item, request: Request): description="Curseforge Mod 文件信息", ) @cache(expire=mcim_config.expire_second.curseforge.file) -async def curseforge_mod_file(modId: int, fileId: int, request: Request): +async def curseforge_mod_file( + modId: Annotated[int, Field(ge=30000, lt=9999999)], + fileId: Annotated[int, Field(ge=530000, lt=99999999)], + request: Request, +): if request.state.force_sync: - sync_file.send(modId=modId, fileId=fileId) + # sync_file.send(modId=modId, fileId=fileId) + await add_curseforge_fileIds_to_queue(fileIds=[fileId]) log.debug(f"modId: {modId} fileId: {fileId} force sync.") return UncachedResponse() # 排除小于 530000 的 fileid - if not fileId >= 530000 or not modId >= 30000: return UncachedResponse() + # if not fileId >= 530000 or not modId >= 30000: return UncachedResponse() trustable = True model: Optional[File] = await request.app.state.aio_mongo_engine.find_one( File, File.modId == modId, File.id == fileId ) if model is None: - sync_file.send(modId=modId, fileId=fileId) + # sync_file.send(modId=modId, fileId=fileId) + await add_curseforge_fileIds_to_queue(fileIds=[fileId]) return UncachedResponse() # elif ( # model.sync_at.timestamp() + mcim_config.expire_second.curseforge.file @@ -529,14 +554,19 @@ async def curseforge_mod_file(modId: int, fileId: int, request: Request): description="Curseforge Mod 文件下载地址", ) # @cache(expire=mcim_config.expire_second.curseforge.file) -async def curseforge_mod_file_download_url(modId: int, fileId: int, request: Request): +async def curseforge_mod_file_download_url( + modId: Annotated[int, Field(ge=30000, lt=9999999)], + fileId: Annotated[int, Field(ge=530000, lt=99999999)], + request: Request, +): # 排除小于 530000 的 fileid - if not fileId >= 530000 or not modId >= 30000: return UncachedResponse() + # if not fileId >= 530000 or not modId >= 30000: return UncachedResponse() model: Optional[File] = await request.app.state.aio_mongo_engine.find_one( File, File.modId == modId, File.id == fileId ) if model is None: - sync_file.send(modId=modId, fileId=fileId) + # sync_file.send(modId=modId, fileId=fileId) + await add_curseforge_fileIds_to_queue(fileIds=[fileId]) return UncachedResponse() return TrustableResponse( content=CurseforgeBaseResponse(data=model.downloadUrl).model_dump(), @@ -545,7 +575,7 @@ async def curseforge_mod_file_download_url(modId: int, fileId: int, request: Req class fingerprints_item(BaseModel): - fingerprints: List[int] + fingerprints: List[Annotated[int, Field(lt=99999999999)]] @v1_router.post( @@ -559,7 +589,7 @@ async def curseforge_fingerprints(item: fingerprints_item, request: Request): 未找到所有 fingerprint 会视为不可信,因为不存在的 fingerprint 会被记录 """ if request.state.force_sync: - sync_fingerprints.send(fingerprints=item.fingerprints) + await add_curseforge_fingerprints_to_queue(fingerprints=item.fingerprints) log.debug(f"fingerprints: {item.fingerprints} force sync.") return UncachedResponse() trustable = True @@ -569,10 +599,11 @@ async def curseforge_fingerprints(item: fingerprints_item, request: Request): ) ) not_match_fingerprints = list( - set(item.fingerprints) - set([fingerprint.id for fingerprint in fingerprints_models]) - ) + set(item.fingerprints) + - set([fingerprint.id for fingerprint in fingerprints_models]) + ) if not fingerprints_models: - sync_fingerprints.send(fingerprints=item.fingerprints) + await add_curseforge_fingerprints_to_queue(fingerprints=item.fingerprints) trustable = False return TrustableResponse( content=CurseforgeBaseResponse( @@ -582,7 +613,7 @@ async def curseforge_fingerprints(item: fingerprints_item, request: Request): ) elif len(fingerprints_models) != len(item.fingerprints): # 找到不存在的 fingerprint - sync_fingerprints.send(fingerprints=not_match_fingerprints) + await add_curseforge_fingerprints_to_queue(fingerprints=not_match_fingerprints) trustable = False exactFingerprints = [] result_fingerprints_models = [] @@ -625,7 +656,7 @@ async def curseforge_fingerprints_432(item: fingerprints_item, request: Request) 未找到所有 fingerprint 会视为不可信,因为不存在的 fingerprint 会被记录 """ if request.state.force_sync: - sync_fingerprints.send(fingerprints=item.fingerprints) + await add_curseforge_fingerprints_to_queue(fingerprints=item.fingerprints) log.debug(f"fingerprints: {item.fingerprints} force sync.") return UncachedResponse() trustable = True @@ -635,10 +666,11 @@ async def curseforge_fingerprints_432(item: fingerprints_item, request: Request) ) ) not_match_fingerprints = list( - set(item.fingerprints) - set([fingerprint.id for fingerprint in fingerprints_models]) - ) + set(item.fingerprints) + - set([fingerprint.id for fingerprint in fingerprints_models]) + ) if not fingerprints_models: - sync_fingerprints.send(fingerprints=item.fingerprints) + await add_curseforge_fingerprints_to_queue(fingerprints=item.fingerprints) trustable = False return TrustableResponse( content=CurseforgeBaseResponse( @@ -647,7 +679,7 @@ async def curseforge_fingerprints_432(item: fingerprints_item, request: Request) trustable=trustable, ) elif len(fingerprints_models) != len(item.fingerprints): - sync_fingerprints.send(fingerprints=not_match_fingerprints) + await add_curseforge_fingerprints_to_queue(fingerprints=not_match_fingerprints) trustable = False exactFingerprints = [] result_fingerprints_models = [] @@ -692,7 +724,7 @@ async def curseforge_categories(request: Request): categories = await request.app.state.aio_redis_engine.hget( "curseforge", "categories" ) - + return TrustableResponse( content=CurseforgeBaseResponse(data=json.loads(categories)).model_dump() - ) \ No newline at end of file + ) diff --git a/app/controller/file_cdn/__init__.py b/app/controller/file_cdn/__init__.py index 3ae423f..342ea36 100644 --- a/app/controller/file_cdn/__init__.py +++ b/app/controller/file_cdn/__init__.py @@ -18,6 +18,8 @@ from app.sync.modrinth import sync_project from app.sync.curseforge import sync_mutil_files +from app.sync_queue.curseforge import add_curseforge_fileIds_to_queue +from app.sync_queue.modrinth import add_modrinth_project_ids_to_queue from app.utils.metric import ( FILE_CDN_FORWARD_TO_ORIGIN_COUNT, FILE_CDN_FORWARD_TO_OPEN93HOME_COUNT, @@ -137,7 +139,8 @@ async def return_open93home_response(sha1: str, request: Request): return return_origin_response() else: # 文件信息不存在 - sync_project.send(project_id) + # sync_project.send(project_id) + await add_modrinth_project_ids_to_queue(project_ids=[project_id]) log.debug(f"sync project {project_id} task send.") return return_origin_response() @@ -211,7 +214,8 @@ async def return_open93home_response(sha1: str, request: Request): ) else: if fileid >= 530000: - sync_mutil_files.send([fileid]) + # sync_mutil_files.send([fileid]) + await add_curseforge_fileIds_to_queue(fileIds=[fileid]) log.debug(f"sync fileId {fileid} task send.") return return_origin_response() diff --git a/app/controller/modrinth/v2/__init__.py b/app/controller/modrinth/v2/__init__.py index c47dd60..bdddc37 100644 --- a/app/controller/modrinth/v2/__init__.py +++ b/app/controller/modrinth/v2/__init__.py @@ -20,6 +20,12 @@ sync_hash, sync_multi_hashes, sync_tags, + async_tags +) +from app.sync_queue.modrinth import ( + add_modrinth_project_ids_to_queue, + add_modrinth_version_ids_to_queue, + add_modrinth_hashes_to_queue, ) from app.config.mcim import MCIMConfig from app.utils.response import ( @@ -79,7 +85,8 @@ async def modrinth_statistics(request: Request): @cache(expire=mcim_config.expire_second.modrinth.project) async def modrinth_project(idslug: str, request: Request): if request.state.force_sync: - sync_project.send(idslug) + # sync_project.send(idslug) + await add_modrinth_project_ids_to_queue(project_ids=[idslug]) log.debug(f"Project {idslug} force sync.") return ForceSyncResponse() trustable = True @@ -87,7 +94,8 @@ async def modrinth_project(idslug: str, request: Request): Project, query.or_(Project.id == idslug, Project.slug == idslug) ) if model is None: - sync_project.send(idslug) + # sync_project.send(idslug) + await add_modrinth_project_ids_to_queue(project_ids=[idslug]) log.debug(f"Project {idslug} not found, send sync task.") return UncachedResponse() elif model.found == False: @@ -113,7 +121,8 @@ async def modrinth_project(idslug: str, request: Request): async def modrinth_projects(ids: str, request: Request): ids_list = json.loads(ids) if request.state.force_sync: - sync_multi_projects.send(project_ids=ids_list) + # sync_multi_projects.send(project_ids=ids_list) + await add_modrinth_project_ids_to_queue(project_ids=ids_list) log.debug(f"Projects {ids_list} force sync.") return ForceSyncResponse() trustable = True @@ -130,13 +139,15 @@ async def modrinth_projects(ids: str, request: Request): models_count = len(models) ids_count = len(ids_list) if not models: - sync_multi_projects.send(project_ids=ids_list) + # sync_multi_projects.send(project_ids=ids_list) + await add_modrinth_project_ids_to_queue(project_ids=ids_list) log.debug(f"Projects {ids_list} not found, send sync task.") return UncachedResponse() elif models_count != ids_count: # 找出没找到的 project_id not_match_ids = list(set(ids_list) - set([model.id for model in models])) - sync_multi_projects.send(project_ids=not_match_ids) + # sync_multi_projects.send(project_ids=not_match_ids) + await add_modrinth_project_ids_to_queue(project_ids=not_match_ids) log.debug( f"Projects {not_match_ids} {not_match_ids}/{ids_count} not found, send sync task." ) @@ -175,7 +186,8 @@ async def modrinth_project_versions(idslug: str, request: Request): 先查 Project 的 Version 列表再拉取...避免遍历整个 Version 表 """ if request.state.force_sync: - sync_project.send(idslug) + # sync_project.send(idslug) + await add_modrinth_project_ids_to_queue(project_ids=[idslug]) log.debug(f"Project {idslug} force sync.") return ForceSyncResponse() trustable = True @@ -185,7 +197,8 @@ async def modrinth_project_versions(idslug: str, request: Request): ) ) if not project_model: - sync_project.send(idslug) + # sync_project.send(idslug) + await add_modrinth_project_ids_to_queue(project_ids=[idslug]) log.debug(f"Project {idslug} not found, send sync task.") return UncachedResponse() else: @@ -238,7 +251,8 @@ async def check_search_result(request: Request, search_result: dict): ) if not_found_project_ids: - sync_multi_projects.send(project_ids=list(not_found_project_ids)) + # sync_multi_projects.send(project_ids=list(not_found_project_ids)) + await add_modrinth_project_ids_to_queue(project_ids=list(not_found_project_ids)) log.debug(f"Projects {not_found_project_ids} not found, send sync task.") else: log.debug(f"All Projects {not_found_project_ids} found.") @@ -296,7 +310,8 @@ async def modrinth_version( version_id: Annotated[str, Path(alias="id")], request: Request ): if request.state.force_sync: - sync_version.send(version_id=version_id) + # sync_version.send(version_id=version_id) + await add_modrinth_version_ids_to_queue(version_ids=[version_id]) log.debug(f"Version {version_id} force sync.") return ForceSyncResponse() trustable = True @@ -306,7 +321,8 @@ async def modrinth_version( Version.id == version_id, ) if model is None: - sync_version.send(version_id=version_id) + # sync_version.send(version_id=version_id) + await add_modrinth_version_ids_to_queue(version_ids=[version_id]) log.debug(f"Version {version_id} not found, send sync task.") return UncachedResponse() elif model.found == False: @@ -334,7 +350,8 @@ async def modrinth_versions(ids: str, request: Request): trustable = True ids_list = json.loads(ids) if request.state.force_sync: - sync_multi_versions.send(version_ids=ids_list) + # sync_multi_versions.send(version_ids=ids_list) + await add_modrinth_version_ids_to_queue(version_ids=ids_list) log.debug(f"Versions {ids} force sync.") return ForceSyncResponse() models: List[Version] = await request.app.state.aio_mongo_engine.find( @@ -343,11 +360,13 @@ async def modrinth_versions(ids: str, request: Request): models_count = len(models) ids_count = len(ids_list) if not models: - sync_multi_versions.send(version_ids=ids_list) + # sync_multi_versions.send(version_ids=ids_list) + await add_modrinth_version_ids_to_queue(version_ids=ids_list) log.debug(f"Versions {ids_list} not found, send sync task.") return UncachedResponse() elif models_count != ids_count: - sync_multi_versions.send(version_ids=ids_list) + # sync_multi_versions.send(version_ids=ids_list) + await add_modrinth_version_ids_to_queue(version_ids=ids_list) log.debug( f"Versions {ids_list} {models_count}/{ids_count} not completely found, send sync task." ) @@ -390,7 +409,8 @@ async def modrinth_file( algorithm: Optional[Algorithm] = Algorithm.sha1, ): if request.state.force_sync: - sync_hash.send(hash=hash_, algorithm=algorithm) + # sync_hash.send(hash=hash_, algorithm=algorithm) + await add_modrinth_hashes_to_queue([hash_], algorithm=algorithm.value) log.debug(f"File {hash_} force sync.") return ForceSyncResponse() trustable = True @@ -404,7 +424,8 @@ async def modrinth_file( ), ) if file is None: - sync_hash.send(hash=hash_, algorithm=algorithm) + # sync_hash.send(hash=hash_, algorithm=algorithm) + await add_modrinth_hashes_to_queue([hash_], algorithm=algorithm.value) log.debug(f"File {hash_} not found, send sync task.") return UncachedResponse() elif file.found == False: @@ -421,7 +442,8 @@ async def modrinth_file( Version, query.and_(Version.id == file.version_id, Version.found == True) ) if version is None: - sync_version.send(version_id=file.version_id) + # sync_version.send(version_id=file.version_id) + await add_modrinth_version_ids_to_queue(version_ids=[file.version_id]) log.debug(f"Version {file.version_id} not found, send sync task.") return UncachedResponse() # elif ( @@ -450,7 +472,8 @@ class HashesQuery(BaseModel): # @cache(expire=mcim_config.expire_second.modrinth.file) async def modrinth_files(items: HashesQuery, request: Request): if request.state.force_sync: - sync_multi_hashes.send(hashes=items.hashes, algorithm=items.algorithm) + # sync_multi_hashes.send(hashes=items.hashes, algorithm=items.algorithm) + await add_modrinth_hashes_to_queue(items.hashes, algorithm=items.algorithm.value) log.debug(f"Files {items.hashes} force sync.") return ForceSyncResponse() trustable = True @@ -469,7 +492,8 @@ async def modrinth_files(items: HashesQuery, request: Request): model_count = len(files_models) hashes_count = len(items.hashes) if not files_models: - sync_multi_hashes.send(hashes=items.hashes, algorithm=items.algorithm) + # sync_multi_hashes.send(hashes=items.hashes, algorithm=items.algorithm) + await add_modrinth_hashes_to_queue(items.hashes, algorithm=items.algorithm.value) log.debug("Files not found, send sync task.") return UncachedResponse() elif model_count != hashes_count: @@ -477,11 +501,13 @@ async def modrinth_files(items: HashesQuery, request: Request): not_found_hashes = list(set(items.hashes) - set( [file.hashes.sha1 if items.algorithm == Algorithm.sha1 else file.hashes.sha512 for file in files_models] )) - sync_multi_hashes.send(hashes=not_found_hashes, algorithm=items.algorithm) - log.debug( - f"Files {not_found_hashes} {len(not_found_hashes)}/{hashes_count} not completely found, send sync task." - ) - trustable = False + if not_found_hashes: + # sync_multi_hashes.send(hashes=not_found_hashes, algorithm=items.algorithm) + await add_modrinth_hashes_to_queue(not_found_hashes, algorithm=items.algorithm.value) + log.debug( + f"Files {not_found_hashes} {len(not_found_hashes)}/{hashes_count} not completely found, send sync task." + ) + trustable = False # Don't need to check version expire version_ids = [file.version_id for file in files_models] @@ -492,7 +518,9 @@ async def modrinth_files(items: HashesQuery, request: Request): version_model_count = len(version_models) file_model_count = len(files_models) if not version_models: - sync_multi_versions.send(version_ids=version_ids) + # sync_multi_versions.send(version_ids=version_ids) + # 一个版本都没找到,直接重新同步 + await add_modrinth_version_ids_to_queue(version_ids=version_ids) log.debug("Versions not found, send sync task.") return UncachedResponse() elif version_model_count != file_model_count: @@ -500,11 +528,13 @@ async def modrinth_files(items: HashesQuery, request: Request): not_found_version_ids = list(set(version_ids) - set( [version.id for version in version_models] )) - sync_multi_versions.send(version_ids=not_found_version_ids) - log.debug( - f"Versions {not_found_version_ids} {len(not_found_version_ids)}/{file_model_count} not completely found, send sync task." - ) - trustable = False + if not_found_version_ids: + # sync_multi_versions.send(version_ids=not_found_version_ids) + await add_modrinth_version_ids_to_queue(version_ids=not_found_version_ids) + log.debug( + f"Versions {not_found_version_ids} {len(not_found_version_ids)}/{file_model_count} not completely found, send sync task." + ) + trustable = False result = {} for version in version_models: result[ @@ -532,7 +562,8 @@ async def modrinth_file_update( algorithm: Optional[Algorithm] = Algorithm.sha1, ): if request.state.force_sync: - sync_hash.send(hash=hash_, algorithm=algorithm) + # sync_hash.send(hash=hash_, algorithm=algorithm) + await add_modrinth_hashes_to_queue([hash_], algorithm=algorithm.value) log.debug(f"Hash {hash_} force sync.") return ForceSyncResponse() trustable = True @@ -584,7 +615,8 @@ async def modrinth_file_update( # ) trustable = False else: - sync_hash.send(hash=hash_, algorithm=algorithm.value) + # sync_hash.send(hash=hash_, algorithm=algorithm.value) + await add_modrinth_hashes_to_queue([hash_], algorithm=algorithm.value) log.debug(f"Hash {hash_} not found, send sync task") return UncachedResponse() return TrustableResponse(content=version_result, trustable=trustable) @@ -601,7 +633,8 @@ class MultiUpdateItems(BaseModel): # @cache(expire=mcim_config.expire_second.modrinth.file) async def modrinth_mutil_file_update(request: Request, items: MultiUpdateItems): if request.state.force_sync: - sync_multi_hashes.send(hashes=items.hashes, algorithm=items.algorithm) + # sync_multi_hashes.send(hashes=items.hashes, algorithm=items.algorithm) + await add_modrinth_hashes_to_queue(items.hashes, algorithm=items.algorithm.value) log.debug(f"Hashes {items.hashes} force sync.") return ForceSyncResponse() trustable = True @@ -648,7 +681,8 @@ async def modrinth_mutil_file_update(request: Request, items: MultiUpdateItems): ] versions_result = await files_collection.aggregate(pipeline).to_list(length=None) if len(versions_result) == 0: - sync_multi_hashes.send(hashes=items.hashes, algorithm=items.algorithm.value) + # sync_multi_hashes.send(hashes=items.hashes, algorithm=items.algorithm.value) + await add_modrinth_hashes_to_queue(items.hashes, algorithm=items.algorithm.value) log.debug(f"Hashes {items.hashes} not found, send sync task") return UncachedResponse() elif len(versions_result) != len(items.hashes): @@ -656,9 +690,11 @@ async def modrinth_mutil_file_update(request: Request, items: MultiUpdateItems): not_found_hashes = list(set(items.hashes) - set( [version["_id"] for version in versions_result] )) - sync_multi_hashes.send(hashes=not_found_hashes, algorithm=items.algorithm.value) - log.debug(f"Hashes {not_found_hashes} not completely found, send sync task.") - trustable = False + if not_found_hashes: + # sync_multi_hashes.send(hashes=not_found_hashes, algorithm=items.algorithm.value) + await add_modrinth_hashes_to_queue(not_found_hashes, algorithm=items.algorithm.value) + log.debug(f"Hashes {not_found_hashes} not completely found, send sync task.") + trustable = False else: # check expire resp = {} @@ -686,7 +722,7 @@ async def modrinth_mutil_file_update(request: Request, items: MultiUpdateItems): async def modrinth_tag_categories(request: Request): category = await request.app.state.aio_redis_engine.hget("modrinth", "categories") if category is None: - sync_tags() + await async_tags() log.debug("Category not found, sync.") category = await request.app.state.aio_redis_engine.hget("modrinth", "categories") return TrustableResponse(content=json.loads(category)) @@ -701,7 +737,7 @@ async def modrinth_tag_categories(request: Request): async def modrinth_tag_loaders(request: Request): loader = await request.app.state.aio_redis_engine.hget("modrinth", "loaders") if loader is None: - sync_tags() + await async_tags() log.debug("Loader not found, sync.") loader = await request.app.state.aio_redis_engine.hget("modrinth", "loaders") return TrustableResponse(content=json.loads(loader)) @@ -718,7 +754,7 @@ async def modrinth_tag_game_versions(request: Request): "modrinth", "game_versions" ) if game_version is None: - sync_tags.send() + await async_tags() log.debug("Game Version not found, sync.") game_version = await request.app.state.aio_redis_engine.hget( "modrinth", "game_versions" @@ -737,7 +773,7 @@ async def modrinth_tag_donation_platforms(request: Request): "modrinth", "donation_platform" ) if donation_platform is None: - sync_tags() + await async_tags() log.debug("Donation Platform not found, sync.") donation_platform = await request.app.state.aio_redis_engine.hget( "modrinth", "donation_platform" @@ -756,7 +792,7 @@ async def modrinth_tag_project_types(request: Request): "modrinth", "project_type" ) if project_type is None: - sync_tags() + await async_tags() log.debug("Project Type not found, sync.") project_type = await request.app.state.aio_redis_engine.hget( "modrinth", "project_type" @@ -773,7 +809,8 @@ async def modrinth_tag_project_types(request: Request): async def modrinth_tag_side_types(request: Request): side_type = await request.app.state.aio_redis_engine.hget("modrinth", "side_type") if side_type is None: - sync_tags() + await async_tags() log.debug("Side Type not found, sync.") side_type = await request.app.state.aio_redis_engine.hget("modrinth", "side_type") return TrustableResponse(content=json.loads(side_type)) + diff --git a/app/database/_redis.py b/app/database/_redis.py index f89b490..15acb2a 100644 --- a/app/database/_redis.py +++ b/app/database/_redis.py @@ -8,7 +8,7 @@ aio_redis_engine: AioRedis = None sync_redis_engine: Redis = None - +sync_queuq_redis_engine: AioRedis = None def init_redis_aioengine() -> AioRedis: global aio_redis_engine @@ -31,6 +31,16 @@ def init_sync_redis_engine() -> Redis: ) return sync_redis_engine +def init_sync_queue_redis_engine() -> AioRedis: + global sync_queuq_redis_engine + sync_queuq_redis_engine = AioRedis( + host=_redis_config.host, + port=_redis_config.port, + password=_redis_config.password, + db=_redis_config.database.sync_queue, + ) + return sync_queuq_redis_engine + async def close_aio_redis_engine(): """ @@ -45,7 +55,7 @@ async def close_aio_redis_engine(): aio_redis_engine = None -def close_redis_engine(): +def close_sync_redis_engine(): """ Close redis when process stopped. """ @@ -57,8 +67,20 @@ def close_redis_engine(): log.warning("no redis connection to close") sync_redis_engine = None +async def close_sync_queue_redis_engine(): + """ + Close redis when process stopped. + """ + global sync_queuq_redis_engine + if sync_queuq_redis_engine is not None: + await sync_queuq_redis_engine.close() + log.success("closed redis connection") + else: + log.warning("no redis connection to close") + sync_queuq_redis_engine = None aio_redis_engine: AioRedis = init_redis_aioengine() sync_redis_engine: Redis = init_sync_redis_engine() +sync_queuq_redis_engine: AioRedis = init_sync_queue_redis_engine() log.success("Redis connection established") # noqa diff --git a/app/sync/__init__.py b/app/sync/__init__.py index c840450..2ab7d4f 100644 --- a/app/sync/__init__.py +++ b/app/sync/__init__.py @@ -7,6 +7,7 @@ from app.database.mongodb import init_mongodb_syncengine, sync_mongo_engine from app.database._redis import ( init_sync_redis_engine, + close_sync_redis_engine, sync_redis_engine, ) from app.config import RedisdbConfig @@ -52,3 +53,5 @@ from app.sync.modrinth import * from app.sync.curseforge import * + +# close_sync_redis_engine() \ No newline at end of file diff --git a/app/sync/modrinth.py b/app/sync/modrinth.py index 68d94ca..fc769ee 100644 --- a/app/sync/modrinth.py +++ b/app/sync/modrinth.py @@ -32,7 +32,7 @@ ) from app.models.database.modrinth import Project, File, Version from app.models.database.file_cdn import File as FileCDN -from app.utils.network import request_sync +from app.utils.network import request_sync, request from app.exceptions import ResponseCodeException from app.config import MCIMConfig from app.utils.loger import log @@ -393,3 +393,20 @@ def sync_tags(): redis_engine.hset("modrinth", "donation_platform", json.dumps(donation_platform)) redis_engine.hset("modrinth", "project_type", json.dumps(project_type)) redis_engine.hset("modrinth", "side_type", json.dumps(side_type)) + +async def async_tags(): + # db 1 + categories = (await request(f"{API}/tag/category")).json() + loaders = (await request(f"{API}/tag/loader")).json() + game_versions = (await request(f"{API}/tag/game_version")).json() + donation_platform = (await request(f"{API}/tag/donation_platform")).json() + project_type = (await request(f"{API}/tag/project_type")).json() + side_type = (await request(f"{API}/tag/side_type")).json() + + + redis_engine.hset("modrinth", "categories", json.dumps(categories)) + redis_engine.hset("modrinth", "loaders", json.dumps(loaders)) + redis_engine.hset("modrinth", "game_versions", json.dumps(game_versions)) + redis_engine.hset("modrinth", "donation_platform", json.dumps(donation_platform)) + redis_engine.hset("modrinth", "project_type", json.dumps(project_type)) + redis_engine.hset("modrinth", "side_type", json.dumps(side_type)) \ No newline at end of file diff --git a/app/sync_queue/__init__.py b/app/sync_queue/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/sync_queue/curseforge.py b/app/sync_queue/curseforge.py new file mode 100644 index 0000000..ceeda46 --- /dev/null +++ b/app/sync_queue/curseforge.py @@ -0,0 +1,20 @@ +from typing import List, Union, Optional + +from app.database._redis import ( + sync_queuq_redis_engine, +) + +# curseforge +async def add_curseforge_modIds_to_queue(modIds: List[int]): + if len(modIds) != 0: + await sync_queuq_redis_engine.sadd("curseforge_modids", *modIds) + + +async def add_curseforge_fileIds_to_queue(fileIds: List[int]): + if len(fileIds) != 0: + await sync_queuq_redis_engine.sadd("curseforge_fileids", *fileIds) + + +async def add_curseforge_fingerprints_to_queue(fingerprints: List[int]): + if len(fingerprints) != 0: + await sync_queuq_redis_engine.sadd("curseforge_fingerprints", *fingerprints) diff --git a/app/sync_queue/modrinth.py b/app/sync_queue/modrinth.py new file mode 100644 index 0000000..5618b51 --- /dev/null +++ b/app/sync_queue/modrinth.py @@ -0,0 +1,23 @@ +from typing import List, Union, Optional + +from app.database._redis import ( + sync_queuq_redis_engine, +) + + +# modrinth +async def add_modrinth_project_ids_to_queue(project_ids: List[str]): + if len(project_ids) != 0: + await sync_queuq_redis_engine.sadd("modrinth_project_ids", *project_ids) + + +async def add_modrinth_version_ids_to_queue(version_ids: List[str]): + if len(version_ids) != 0: + await sync_queuq_redis_engine.sadd("modrinth_version_ids", *version_ids) + + +async def add_modrinth_hashes_to_queue(hashes: List[str], algorithm: str = "sha1"): + if algorithm not in ["sha1", "sha512"]: + raise ValueError("algorithm must be one of sha1, sha512") + if len(hashes) != 0: + await sync_queuq_redis_engine.sadd(f"modrinth_hashes_{algorithm}", *hashes) diff --git a/app/utils/loger/__init__.py b/app/utils/loger/__init__.py index 2b86774..57aac70 100644 --- a/app/utils/loger/__init__.py +++ b/app/utils/loger/__init__.py @@ -37,9 +37,9 @@ def emit(self, record): sys.stdout, level="DEBUG" if mcim_config.debug else "INFO", format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {module}.{function}.{line} | {message}", - colorize=True, - backtrace=True, - diagnose=True, + # colorize=True, + # backtrace=True, + # diagnose=True, ) # 拦截标准日志并重定向到 Loguru