diff --git a/app.py b/app.py index 3ed32ac..d30d9aa 100644 --- a/app.py +++ b/app.py @@ -15,10 +15,9 @@ from models.account import ModelAccount, ModelPortfolio from models.fund import ModelFund, ModelFundNav from models.fund_user import ModelFundPosition, ModelFundTransaction -from models.task import ModelTask from pages.account import render_account_page from pages.home import render_home_page -from pages.task import render_task_page +from kz_dash.page.task.task_page import render_task_page from pages.transaction import render_transaction_page @@ -62,7 +61,6 @@ def init_application(): ModelFundTransaction, ModelFundNav, ModelFund, - ModelTask, ] ) logger.info("数据库初始化成功") diff --git a/backend/__init__.py b/backend/__init__.py index 185e80f..5d2ee09 100644 --- a/backend/__init__.py +++ b/backend/__init__.py @@ -5,10 +5,10 @@ from backend.api.fund import api as fund_ns from backend.api.portfolio import api as portfolio_ns from kz_dash.backend.api.runtime import api as runtime_ns -from backend.api.task import api as task_ns +from kz_dash.backend.api.task import api as task_ns from config import API_CONFIG -from scheduler.job_manager import JobManager -from scheduler.tasks import init_tasks +from kz_dash.scheduler.job_manager import JobManager +from task.task_init import init_tasks def register_blueprint(app): diff --git a/backend/api/task.py b/backend/api/task.py deleted file mode 100644 index abcd6e1..0000000 --- a/backend/api/task.py +++ /dev/null @@ -1,134 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -import logging - -from flask import request -from flask_restx import Namespace, Resource, fields - -from kz_dash.backend.api.common import create_list_response_model, create_response_model -from kz_dash.models.database import get_record_list -from models.task import ModelTask -from scheduler.job_manager import JobManager -from scheduler.tasks import TaskFactory -from kz_dash.utility.response import format_response - -logger = logging.getLogger(__name__) - -api = Namespace("tasks", description="任务管理") - - -# 定义基础数据模型 -task_base = api.model( - "TaskBase", - { - "task_id": fields.String(required=True, description="任务ID"), - "name": fields.String(required=True, description="任务名称"), - "delay": fields.Integer(required=True, description="延迟时间(秒)"), - "status": fields.String(required=True, description="任务状态"), - "progress": fields.Integer(required=True, description="进度"), - "result": fields.String(description="执行结果"), - "error": fields.String(description="错误信息"), - "start_time": fields.DateTime(description="开始时间"), - "end_time": fields.DateTime(description="结束时间"), - "timeout": fields.Integer(required=True, description="超时时间(秒)"), - "created_at": fields.DateTime(required=True, description="创建时间"), - }, -) - -# 使用通用函数创建响应模型 -task_response = create_response_model(api, "Task", task_base) -task_list_response = create_list_response_model(api, "Task", task_base) - -# 定义任务创建输入模型 -task_input = api.model( - "TaskInput", - { - "type": fields.String( - required=True, - description="任务类型", - enum=list(TaskFactory().get_task_types().keys()), - ), - "delay": fields.Integer(description="延迟时间(秒)"), - "timeout": fields.Integer(description="超时时间(秒)"), - "params": fields.Raw(description="任务参数"), - }, -) - - -@api.route("/tasks") -class TaskList(Resource): - job_manager = JobManager() - - @api.doc("获取任务历史") - @api.param("limit", "返回记录数量限制", type=int) - @api.marshal_with(task_list_response) - def get(self): - """获取任务历史列表""" - try: - tasks = get_record_list( - ModelTask, {"parent_task_id__null": False}, order_by=["-created_at"] - ) - return format_response(data=tasks) - except (ValueError, KeyError) as e: - logger.error("获取任务历史失败: %s", str(e)) - return format_response(message=f"获取任务历史失败: {str(e)}", code=500) - - @api.doc("创建新任务") - @api.expect(task_input) - @api.marshal_with(task_response) - def post(self): - """创建新任务""" - try: - data = api.payload - task_type = data["type"] - - if task_type not in TaskFactory().get_task_types(): - return format_response(message="未知的任务类型", code=400) - - task_id = self.job_manager.add_task( - task_type=task_type, - delay=data.get("delay"), - timeout=data.get("timeout"), - **(data.get("params", {})), - ) - - return format_response(data={"task_id": task_id}, message="任务创建成功") - except (ValueError, KeyError, TypeError) as e: - logger.error("创建任务失败: %s", str(e)) - return format_response(message=f"创建任务失败: {str(e)}", code=500) - - -@api.route("/tasks/") -@api.param("task_id", "任务ID") -class Task(Resource): - job_manager = JobManager() - - @api.doc("获取任务状态") - @api.marshal_with(task_response) - def get(self, task_id): - """获取指定任务的状态""" - try: - status = self.job_manager.get_task(task_id) - if status.get("status") == "not_found": - return format_response(message="任务不存在", code=404) - return format_response(data=status) - except (ValueError, KeyError) as e: - logger.error("获取任务状态失败: %s", str(e)) - return format_response(message=f"获取任务状态失败: {str(e)}", code=500) - - # @api.doc("暂停任务") - # @api.marshal_with(task_response) - # def put(self, task_id): - # """暂停任务""" - # if self.job_manager.pause_task(task_id): - # return format_response(message="任务已暂停") - # return format_response(message="任务不存在或无法暂停", code=404) - - # @api.doc("恢复任务") - # @api.marshal_with(task_response) - # def post(self, task_id): - # """恢复任务""" - # if self.job_manager.resume_task(task_id): - # return format_response(message="任务已恢复") - # return format_response(message="任务不存在或无法恢复", code=404) diff --git a/data_source/implementations/eastmoney.py b/data_source/implementations/eastmoney.py index dedd1bf..3ae84b9 100644 --- a/data_source/implementations/eastmoney.py +++ b/data_source/implementations/eastmoney.py @@ -8,7 +8,7 @@ from bs4 import BeautifulSoup from data_source.interface import IDataSource -from scheduler.tasks.base import FundType +from task.task_config import FundType from kz_dash.utility.datetime_helper import format_date, get_timestamp, get_timestamp_ms from kz_dash.utility.string_helper import ( extract_number_with_unit, diff --git a/kz_dash b/kz_dash index 69bf4e7..9a6f0bc 160000 --- a/kz_dash +++ b/kz_dash @@ -1 +1 @@ -Subproject commit 69bf4e7f41411d45968c8949b3db4708cd9ea2d3 +Subproject commit 9a6f0bc5d14e1c354be104d304dbbcf1d96fcc05 diff --git a/models/database.py b/models/database.py index ccfca08..ab77494 100644 --- a/models/database.py +++ b/models/database.py @@ -97,7 +97,7 @@ def get_fund_transactions(portfolio_id: str) -> List[Dict[str, Any]]: def get_statistics() -> Dict[str, int]: """获取统计数据""" - from scheduler.tasks import TaskStatus + from kz_dash.scheduler.base_task import TaskStatus stats = { # main db diff --git a/pages/task/__init__.py b/pages/task/__init__.py deleted file mode 100644 index ff2146b..0000000 --- a/pages/task/__init__.py +++ /dev/null @@ -1,19 +0,0 @@ -"""任务管理页面模块 - -该模块包含任务管理的功能,主要包括: -- 任务的创建和管理 -- 任务进度监控 -- 任务状态更新 -- 任务详情查看 - -文件结构: -- task_page.py: 页面主渲染函数 -- table.py: 任务列表表格相关 -- task_modal.py: 任务创建弹窗及其回调 -- task_detail.py: 任务详情弹窗相关 -- task_utils.py: 通用工具函数和常量 -""" - -from pages.task.task_page import render_task_page - -__all__ = ["render_task_page"] diff --git a/pages/task/task_detail.py b/pages/task/task_detail.py deleted file mode 100644 index 2ebb192..0000000 --- a/pages/task/task_detail.py +++ /dev/null @@ -1,425 +0,0 @@ -# 任务详情弹窗模块 -# 1. Store组件 -# - task-detail-task-id: 存储正在查看的任务ID -# 提供任务详情查看功能: -# - 渲染任务详情弹窗 -# - 处理详情查看操作 - - -import logging -from collections import Counter - -import feffery_antd_components as fac -from dash import Input, Output, State, callback, callback_context, dcc, html, no_update, set_props -from dash.development.base_component import Component -from dash.exceptions import PreventUpdate -from feffery_utils_components import FefferyJsonViewer - -from config import PAGE_CONFIG -from kz_dash.models.database import get_record -from models.task import ModelTask -from pages.task.task_utils import ( - STATUS_COLORS, - STATUS_LABELS, - create_name_with_input_params, - create_status_tag, - get_sub_tasks, -) -from scheduler.job_manager import JobManager -from scheduler.tasks import TaskStatus -from kz_dash.utility.datetime_helper import format_datetime -from kz_dash.utility.string_helper import json_str_to_dict - -logger = logging.getLogger(__name__) - - -def render_task_detail_modal() -> Component: - """渲染任务详情对话框""" - return fac.Fragment( - [ - dcc.Store(id="task-detail-task-id", data=""), - fac.AntdModal( - id="task-detail-modal", - title="任务详情", - visible=False, - width=1000, - okText="关闭", - cancelText=None, - maskClosable=True, - centered=True, - children=[ - # 添加30秒定时刷新组件 - dcc.Interval( - id="task-detail-interval", - interval=PAGE_CONFIG["TASK_DETAIL_INTERVAL_TIME"], - ), - html.Div(id="task-detail-content"), # 使用独立的内容容器 - ], - bodyStyle={ - "padding": "24px", - "maxHeight": "80vh", - "overflowY": "auto", - }, - ), - ] - ) - - -@callback( - [ - Output("task-detail-modal", "visible"), - Output("task-detail-content", "children", allow_duplicate=True), - Output("task-detail-task-id", "data", allow_duplicate=True), - Output("task-detail-interval", "disabled", allow_duplicate=True), - ], - [ - Input("task-list", "nClicksButton"), - State("task-list", "clickedCustom"), - State("task-list", "recentlyButtonClickedRow"), - ], - prevent_initial_call=True, -) -def handle_task_detail(nClicks, custom, recentlyButtonClickedRow): - """处理任务详情查看""" - ctx = callback_context - if not ctx.triggered: - raise PreventUpdate - - try: - task_id = recentlyButtonClickedRow["task_id"] - - if custom != "view": - return no_update - - task = get_record(ModelTask, {"task_id": task_id}) - if not task: - logger.warning("未找到任务: %s", task_id) - return no_update - - # 是否更新任务详情内容 - is_update = task.status == TaskStatus.RUNNING or task.status == TaskStatus.PENDING - - content = get_task_detail(task.to_dict()) - logger.info("显示任务详情: %s", task_id) - return True, content, task_id, not is_update - - except Exception as e: - logger.error("处理任务详情失败: %s", str(e), exc_info=True) - return no_update - - -@callback( - Output("task-detail-task-id", "data", allow_duplicate=True), - Input("task-detail-modal", "visible"), - prevent_initial_call=True, -) -def update_task_detail_task_id(visible: bool) -> str: - """更新正在查看的任务ID""" - if visible: - return no_update - else: - return "" - - -# 添加回调函数 -@callback( - Output("global-message-container", "children", allow_duplicate=True), - [ - Input("subtask-table", "nClicksButton"), - State("subtask-table", "clickedCustom"), - State("subtask-table", "recentlyButtonClickedRow"), - ], - prevent_initial_call=True, -) -def handle_sub_task_action(nClicks, custom, recentlyButtonClickedRow): - """处理子任务操作的回调""" - if not nClicks: - raise PreventUpdate - - if custom == "view": - # 关闭当前任务详情弹窗 - set_props("task-detail-modal", {"visible": False}) - return no_update - elif custom == "copy": - task_id = recentlyButtonClickedRow["task_id"] - JobManager().copy_task(task_id) - return fac.AntdMessage(content="任务重新运行", type="success") - else: - return no_update - - -def get_task_detail(task: dict) -> list: - """获取任务详情内容 - - Args: - task_id: 任务ID - - Returns: - list: 任务详情内容组件列表 - """ - - try: - # 构建详情内容 - content = [ - # 基本信息描述列表 - fac.AntdDescriptions( - items=[ - {"label": "任务ID", "children": task["task_id"]}, - {"label": "任务名称", "children": task["name"]}, - { - "label": "状态", - "children": create_status_tag(task, show_error=False), - }, - {"label": "延迟执行", "children": f"{task['delay']}秒"}, - { - "label": "进度", - "children": fac.AntdProgress( - percent=task["progress"], - size="small", - ), - }, - ], - bordered=True, - column=2, - size="small", - labelStyle={ - "fontWeight": "bold", - "width": "100px", - "justifyContent": "flex-end", - "paddingRight": "8px", - }, - style={"marginBottom": "24px"}, - ), - fac.AntdDivider(children="时间信息", innerTextOrientation="left"), - fac.AntdDescriptions( - items=[ - {"label": "创建时间", "children": format_datetime(task["created_at"])}, - {"label": "更新时间", "children": format_datetime(task["updated_at"])}, - { - "label": "开始时间", - "children": ( - format_datetime(task["start_time"]) if task["start_time"] else "-" - ), - }, - { - "label": "结束时间", - "children": format_datetime(task["end_time"]) if task["end_time"] else "-", - }, - ], - bordered=True, - column=2, - size="small", - labelStyle={ - "fontWeight": "bold", - "width": "100px", - "justifyContent": "flex-end", - "paddingRight": "8px", - }, - style={"marginBottom": "24px"}, - ), - fac.AntdDivider(children="输入输出", innerTextOrientation="left"), - fac.AntdDescriptions( - items=[ - { - "label": "输入参数", - "children": FefferyJsonViewer( - data=json_str_to_dict(task.get("input_params", "{}")), - quotesOnKeys=False, - enableClipboard=False, - displayDataTypes=False, - displayObjectSize=False, - style={ - "fontSize": "12px", - "backgroundColor": "transparent", - }, - ), - "span": 3, - }, - { - "label": "执行结果", - "children": FefferyJsonViewer( - data=json_str_to_dict(task.get("result", "{}")), - quotesOnKeys=False, - enableClipboard=False, - displayDataTypes=False, - displayObjectSize=False, - style={ - "fontSize": "12px", - "backgroundColor": "transparent", - }, - ), - "span": 3, - }, - { - "label": "错误信息", - "children": ( - (task.get("error") or "未知错误") - if task["status"] == TaskStatus.FAILED - else "-" - ), - "span": 3, - }, - ], - bordered=True, - column=3, - size="small", - labelStyle={ - "fontWeight": "bold", - "width": "80px", - "justifyContent": "flex-end", - "paddingRight": "8px", - }, - style={"marginBottom": "24px"}, - ), - ] - - # 添加子任务表格(如果有子任务) - subtasks = [sub_task.to_dict() for sub_task in get_sub_tasks(task["task_id"])] - - if subtasks: - content.extend( - [ - fac.AntdDivider(children="子任务列表", innerTextOrientation="left"), - # 统计子任务各状态数量 - fac.AntdSpace( - [ - fac.AntdTag( - content=f"{STATUS_LABELS[status]}({count})", - color=STATUS_COLORS[status], - ) - for status, count in Counter( - subtask["status"] for subtask in subtasks - ).items() - ], - style={ - "marginBottom": "12px", - "display": "flex", - "justifyContent": "flex-start", - }, - ), - fac.AntdTable( - id="subtask-table", - columns=[ - { - "title": "序号", - "key": "index", - "dataIndex": "index", - "width": "6%", - }, - { - "title": "任务ID", - "dataIndex": "task_id", - "key": "task_id", - "width": "20%", - }, - { - "title": "任务名称", - "dataIndex": "name", - "key": "name", - "width": "10%", - }, - { - "title": "状态", - "dataIndex": "status_tag", - "key": "status", - "width": "11%", - }, - { - "title": "进度", - "dataIndex": "progress_bar", - "key": "progress", - "width": "15%", - }, - { - "title": "开始时间", - "dataIndex": "start_time", - "key": "start_time", - "width": "18%", - }, - { - "title": "操作", - "dataIndex": "action", - "key": "action", - "width": "5%", - "renderOptions": { - "renderType": "button", - }, - }, - ], - data=[ - { - "index": i + 1, - "task_id": subtask["task_id"], - "name": create_name_with_input_params(subtask), - "status_tag": create_status_tag(subtask), - "progress_bar": fac.AntdProgress( - percent=subtask["progress"], - size="small", - ), - "start_time": ( - format_datetime(subtask["start_time"]) - if subtask.get("start_time") - else "" - ), - "action": [ - { - "icon": "antd-eye", - "type": "link", - "iconRenderer": "AntdIcon", - "custom": "view", - }, - { - "icon": "antd-reload", - "type": "link", - "iconRenderer": "AntdIcon", - "custom": "copy", - }, - ], - } - for i, subtask in enumerate(subtasks) - ], - bordered=True, - size="small", - pagination=False, - ), - ] - ) - - return content - - except Exception as e: - logger.error("获取任务详情失败: %s", str(e), exc_info=True) - return [] - - -# 修改定时更新回调 -@callback( - Output("task-detail-content", "children", allow_duplicate=True), - Output("task-detail-interval", "disabled", allow_duplicate=True), - [ - Input("task-detail-interval", "n_intervals"), - State("task-detail-task-id", "data"), - ], - prevent_initial_call=True, -) -def update_task_detail(n: int, task_id: str) -> list: - """定时更新任务详情 - - Args: - n: 定时器触发次数 - task_id: 当前查看的任务ID - - Returns: - list: 更新后的任务详情内容 - """ - if not task_id or len(task_id) < 1: - raise PreventUpdate - - task = get_record(ModelTask, {"task_id": task_id}) - if not task: - raise PreventUpdate - - logger.info("定时更新任务详情: %s", task_id) - is_update = task.status == TaskStatus.RUNNING or task.status == TaskStatus.PENDING - - return get_task_detail(task.to_dict()), not is_update diff --git a/pages/task/task_modal.py b/pages/task/task_modal.py deleted file mode 100644 index b53d4a7..0000000 --- a/pages/task/task_modal.py +++ /dev/null @@ -1,326 +0,0 @@ -"""任务创建弹窗模块 - -提供任务创建和编辑功能: -- 渲染任务创建弹窗 -- 处理任务参数表单 -- 处理任务创建和编辑操作 -""" - -import logging -from typing import Any, Dict, List, Optional - -import dash -import feffery_antd_components as fac -from dash import ALL, Input, Output, State, callback, html -from dash.exceptions import PreventUpdate - -from components.fund_code_aio import FundCodeAIO -from pages.task.task_utils import get_tasks -from scheduler.job_manager import JobManager -from scheduler.tasks import TaskFactory - -logger = logging.getLogger(__name__) - - -def render_task_modal() -> fac.AntdModal: - """渲染任务创建对话框""" - return fac.AntdModal( - id="task-modal", - title="新建任务", - visible=False, - maskClosable=False, - width=700, - renderFooter=True, - okText="确定", - cancelText="取消", - bodyStyle={ - "padding": "24px 24px 0", - }, - children=[ - fac.AntdForm( - id="task-form", - labelCol={"span": 6}, - wrapperCol={"span": 18}, - children=[ - fac.AntdFormItem( - label="任务类型", - required=True, - children=fac.AntdSelect( - id="task-type-select", - placeholder="请选择任务类型", - options=[ - { - "label": f"{config['name']} - {config['description']}", - "value": task_type, - } - for task_type, config in TaskFactory().get_task_types().items() - ], - style={ - "width": "100%", - }, - ), - ), - # 动态参数表单容器 - html.Div(id="task-params-container"), - # 基础配置项 - fac.AntdFormItem( - label="延迟时间", - tooltip="任务延迟执行的时间(秒)", - children=fac.AntdInputNumber( - id="delay-input", - placeholder="请输入延迟时间(秒)", - min=0, - style={"width": "100%"}, - ), - ), - fac.AntdFormItem( - label="超时时间", - tooltip="任务最大执行时间(秒)", - children=fac.AntdInputNumber( - id="timeout-input", - placeholder="请输入超时时间(秒)", - min=1, - style={"width": "100%"}, - ), - ), - ], - style={ - "width": "100%", - }, - ) - ], - ) - - -def generate_param_form_item( - param: Dict[str, Any], use_pattern_id: bool = True -) -> Optional[fac.AntdFormItem]: - """生成参数表单项 - - Args: - param: 参数配置 - use_pattern_id: 是否使用模式匹配ID格式 - - Returns: - 表单项组件 - """ - # 根据参数类型创建不同的输入组件 - if param["type"] == "fund-code-aio": - input_component = FundCodeAIO(aio_id=f"task-param-{param['key']}") - else: - if param["type"] == "string": - input_component = fac.AntdInput( - id=( - {"type": "task-param", "param": param["key"]} - if use_pattern_id - else f"param-{param['key']}" - ), - placeholder=param.get("description", ""), - style={"width": "100%"}, - ) - elif param["type"] == "number": - input_component = fac.AntdInputNumber( - id=( - {"type": "task-param", "param": param["key"]} - if use_pattern_id - else f"param-{param['key']}" - ), - defaultValue=param.get("default", None), - placeholder=param.get("description", ""), - style={"width": "100%"}, - ) - elif param["type"] == "select": - input_component = fac.AntdSelect( - id=( - {"type": "task-param", "param": param["key"]} - if use_pattern_id - else f"param-{param['key']}" - ), - options=param.get("select_options", []), - defaultValue=param.get("default", None), - placeholder=param.get("description", ""), - style={"width": "100%"}, - ) - elif param["type"] == "date": - input_component = fac.AntdDatePicker( - id=( - {"type": "task-param", "param": param["key"]} - if use_pattern_id - else f"param-{param['key']}" - ), - placeholder=param.get("description", ""), - style={"width": "100%"}, - ) - elif param["type"] == "boolean": - input_component = fac.AntdSwitch( - id=( - {"type": "task-param", "param": param["key"]} - if use_pattern_id - else f"param-{param['key']}" - ), - checkedChildren=fac.AntdIcon(icon="antd-check"), - unCheckedChildren=fac.AntdIcon(icon="antd-close"), - checked=True if param.get("default", False) else False, - ) - else: - logger.warning("未知的参数类型: %s", param["type"]) - return None - - return fac.AntdFormItem( - label=param["name"], - required=param.get("required", False), - tooltip=param.get("description", ""), - children=input_component, - ) - - -def generate_param_items(task_type: str, use_pattern_id: bool = True) -> List[Any]: - """生成任务参数表单项列表 - - Args: - task_type: 任务类型 - use_pattern_id: 是否使用模式匹配ID格式 - - Returns: - 参数表单组件列表 - """ - task_config = TaskFactory().get_task_types().get(task_type, {}) - params = task_config.get("params", []) - - param_items = [] - for param in params: - form_item = generate_param_form_item(param, use_pattern_id) - if form_item: - param_items.append(form_item) - - return param_items - - -@callback( - [ - Output("task-modal", "visible"), - Output("task-params-container", "children", allow_duplicate=True), - Output("task-type-select", "value", allow_duplicate=True), - Output("delay-input", "value"), - Output("timeout-input", "value"), - ], - [ - Input("add-task-btn", "nClicks"), - Input("task-type-select", "value"), - ], - prevent_initial_call=True, -) -def show_task_modal(n_clicks, task_type): - """显示任务创建对话框并更新参数表单""" - ctx = dash.callback_context - if not ctx.triggered: - raise PreventUpdate - - trigger_id = ctx.triggered[0]["prop_id"] - task_factory = TaskFactory() - - # 获取第一个可用的任务类型作为默认值 - default_task_type = next(iter(task_factory.get_task_types().keys()), None) - - if trigger_id == "add-task-btn.nClicks": - if n_clicks: - # 打开弹窗时,置默认任务类型并生成对应的参数表单 - if default_task_type: - task_config = task_factory.get_task_types().get(default_task_type, {}) - param_items = generate_param_items(default_task_type) - return ( - True, - param_items, - default_task_type, - task_config.get("delay", 0), - task_config.get("timeout"), - ) - return True, [], None, None, None - - elif trigger_id == "task-type-select.value": - if not task_type: - return dash.no_update, [], dash.no_update, None, None - - task_config = task_factory.get_task_types().get(task_type, {}) - param_items = generate_param_items(task_type) - return ( - dash.no_update, - param_items, - dash.no_update, - task_config.get("delay", 0), - task_config.get("timeout"), - ) - - return dash.no_update, dash.no_update, dash.no_update, dash.no_update, dash.no_update - - -@callback( - [ - Output("task-modal", "visible", allow_duplicate=True), - Output("task-type-select", "value", allow_duplicate=True), - Output("delay-input", "value", allow_duplicate=True), - Output("timeout-input", "value", allow_duplicate=True), - Output("task-list-interval", "disabled", allow_duplicate=True), - ], - Input("task-modal", "okCounts"), - [ - State("task-type-select", "value"), - State({"type": "task-param", "param": ALL}, "value"), - State({"type": "task-param", "param": ALL}, "checked"), - State({"aio_id": ALL, "component": "FundCodeAIO", "subcomponent": "select"}, "value"), - State("delay-input", "value"), - State("timeout-input", "value"), - ], - prevent_initial_call=True, -) -def handle_task_create( - ok_counts: int, - task_type: str, - param_values: List[Any], - param_checked: List[bool], - fund_code_values: List[str], - delay: Optional[int], - timeout: Optional[int], -): - """处理任务创建""" - if not ok_counts or not task_type: - raise PreventUpdate - - # 获取任务参数配置 - task_config = TaskFactory().get_task_types().get(task_type, {}) - params = task_config.get("params", []) - - # 构建参数字典 - task_params = {} - param_index = 0 - fund_code_index = 0 - - for param in params: - if param["type"] == "fund-code-aio": - value = fund_code_values[fund_code_index] if fund_code_values else None - fund_code_index += 1 - elif param["type"] == "boolean": - value = param_checked[param_index] - param_index += 1 - else: - value = param_values[param_index] - param_index += 1 - - logger.debug("处理参数: %s = %s", param["key"], value) - - if param.get("required", False) and not value: - logger.warning("缺少必需参数: %s", param["name"]) - raise PreventUpdate - - task_params[param["key"]] = value - - # 创建任务 - try: - JobManager().add_task(task_type=task_type, delay=delay, timeout=timeout, **task_params) - logger.info("创建任务成功: %s", task_type) - except Exception as e: - logger.error("创建任务失败: %s", str(e), exc_info=True) - raise PreventUpdate from e - - # 关闭对话框并清空表单 - return False, None, None, None, False diff --git a/pages/task/task_page.py b/pages/task/task_page.py deleted file mode 100644 index 33df7be..0000000 --- a/pages/task/task_page.py +++ /dev/null @@ -1,73 +0,0 @@ -# 任务管理页面主渲染模块 -# -# 页面结构: -# -# 2. 页面标题 -# - 图标 + 文字标题 -# -# 3. 主要内容 -# - 任务列表表格 -# -# 4. 弹窗组件 -# - 任务创建弹窗 -# - 任务详情弹窗 -# - - -import feffery_antd_components as fac -from dash import dcc, html - -from pages.task.task_detail import render_task_detail_modal -from pages.task.task_modal import render_task_modal -from pages.task.task_table import render_task_table -from pages.task.task_utils import ICON_STYLES, PAGE_PADDING - - -def render_task_page() -> html.Div: - """渲染任务管理页面 - - Returns: - 包含完整页面结构的Div组件 - """ - return html.Div( - [ - # 页面标题 - fac.AntdRow( - fac.AntdCol( - html.Div( - [ - fac.AntdIcon( - icon="antd-schedule", - style=ICON_STYLES, - ), - "任务管理", - ], - style={ - "fontSize": "20px", - "fontWeight": "bold", - "padding": "16px 0", - "display": "flex", - "alignItems": "center", - }, - ), - span=24, - ) - ), - # 主要内容区域 - fac.AntdRow( - [ - fac.AntdCol( - [ - render_task_table(), - ], - span=24, - style={"padding": f"{PAGE_PADDING}px"}, - ), - ] - ), - # 对话框组件 - render_task_modal(), - render_task_detail_modal(), - ], - style={"padding": f"{PAGE_PADDING}px"}, - ) diff --git a/pages/task/task_table.py b/pages/task/task_table.py deleted file mode 100644 index d05ab99..0000000 --- a/pages/task/task_table.py +++ /dev/null @@ -1,217 +0,0 @@ -"""任务列表表格模块 - -提供任务列表的表格展示功能: -- 表格数据展示 -- 表格操作按钮 -- 数据刷新 -""" - -import logging - -import dash -import feffery_antd_components as fac -from dash import Input, Output, State, callback, dcc, no_update -from dash.exceptions import PreventUpdate - -from config import PAGE_CONFIG -from pages.task.task_utils import ( - TABLE_STYLES, - TASK_PAGE_SIZE, - get_tasks_with_pagination, - prepare_task_for_display, -) -from scheduler.job_manager import JobManager, TaskStatus -from kz_dash.utility.fac_helper import show_message - -logger = logging.getLogger(__name__) - - -def render_task_table() -> fac.AntdCard: - """渲染任务表格 - - Returns: - 任务表格卡片组件 - """ - return fac.AntdCard( - title="任务管理", - bordered=False, - extra=[ - fac.AntdButton( - "新建任务", - type="primary", - icon=fac.AntdIcon(icon="antd-plus"), - id="add-task-btn", - ), - ], - children=[ - # 添加定时器组件 - dcc.Interval( - id="task-list-interval", - interval=PAGE_CONFIG["TASK_LIST_INTERVAL_TIME"], - disabled=True, # 默认禁用 - ), - fac.AntdTable( - id="task-list", - columns=[ - { - "title": "任务ID", - "dataIndex": "task_id", - "key": "task_id", - "width": "15%", - }, - { - "title": "任务名称", - "dataIndex": "name", - "key": "name", - "width": "10%", - }, - { - "title": "输入参数", - "dataIndex": "input_params", - "key": "input_params", - "width": "20%", - }, - { - "title": "状态", - "dataIndex": "status_tag", - "key": "status", - "width": "10%", - }, - { - "title": "进度", - "dataIndex": "progress_bar", - "key": "progress", - "width": "15%", - }, - { - "title": "创建时间", - "dataIndex": "created_at", - "key": "created_at", - "width": "15%", - "sorter": True, - }, - { - "title": "操作", - "dataIndex": "actions", - "key": "actions", - "width": "10%", - "renderOptions": { - "renderType": "button", - }, - }, - ], - data=[], # 初始为空,由回调函数加载数据 - bordered=True, - size="small", - pagination={ - "pageSize": TASK_PAGE_SIZE, - "showSizeChanger": True, - "showQuickJumper": True, - "total": 0, # 初始为0,由回调函数更新 - "hideOnSinglePage": True, - }, - mode="server-side", # 使用服务端模式 - sortOptions={"sortDataIndexes": ["name", "created_at"]}, - # filterOptions={ - # "status_tag": { - # "filterCustomItems": [ - # {"label": label, "value": status} - # for status, label in STATUS_LABELS.items() - # ], - # } - # }, - style=TABLE_STYLES, - ), - ], - bodyStyle={"padding": "12px"}, - style={"width": "100%"}, - ) - - -@callback( - [ - Output("task-list", "data", allow_duplicate=True), - Output("task-list", "pagination", allow_duplicate=True), - Output("task-list-interval", "disabled", allow_duplicate=True), - ], - [ - Input("task-list", "pagination"), - Input("task-list", "sorter"), - Input("task-list", "filters"), - Input("task-list-interval", "n_intervals"), - ], - prevent_initial_call=True, -) -def update_task_list(pagination, sorter, filters, n_intervals): - """更新任务列表""" - ctx = dash.callback_context - triggered_id = ctx.triggered[0]["prop_id"].split(".")[0] if ctx.triggered else None - - # 获取当前页码和每页数量 - current_page = pagination.get("current", 1) if pagination else 1 - page_size = pagination.get("pageSize", TASK_PAGE_SIZE) if pagination else TASK_PAGE_SIZE - - # 获取最新数据 - result = get_tasks_with_pagination( - page=current_page, - page_size=page_size, - filters=filters, - sorter=sorter, - ) - - # 准备显示数据 - display_data = [prepare_task_for_display(task) for task in result["data"]] - - # 更新分页信息 - pagination_info = { - "current": current_page, - "pageSize": page_size, - "total": result["total"], - "showSizeChanger": True, - "showQuickJumper": True, - } - - # 检查是否有运行中的任务 - has_running_tasks = any( - task["status"] in [TaskStatus.PENDING, TaskStatus.RUNNING] for task in result["data"] - ) - - return display_data, pagination_info, not has_running_tasks - - -# 添加回调函数 -@callback( - Output("task-list", "pagination", allow_duplicate=True), - [ - Input("task-list", "nClicksButton"), - State("task-list", "clickedCustom"), - State("task-list", "recentlyButtonClickedRow"), - State("task-list", "pagination"), - ], - prevent_initial_call=True, -) -def handle_task_list_action(nClicks, custom, recentlyButtonClickedRow, pagination): - """处理任务列表操作的回调""" - if not nClicks: - raise PreventUpdate - - task_id = recentlyButtonClickedRow["task_id"] - if custom == "view": - # 查看任务详情,已单独处理 - return no_update - elif custom == "copy": - JobManager().copy_task(task_id) - # 重新加载任务列表 - new_pagination = pagination.copy() - new_pagination["current"] = 1 - return new_pagination - elif custom == "delete": - try: - JobManager().delete_task(task_id) - new_pagination = pagination.copy() - show_message(message="任务已删除", display_type="success") - return new_pagination - except Exception as e: - show_message(message=str(e), display_type="error") - - return no_update diff --git a/pages/task/task_utils.py b/pages/task/task_utils.py deleted file mode 100644 index 08ef498..0000000 --- a/pages/task/task_utils.py +++ /dev/null @@ -1,294 +0,0 @@ -"""通用工具函数模块 - -包含: -- 任务状态常量 -- 样式常量 -- 工具函数 -""" - -import json -from typing import Any, Dict, List - -import feffery_antd_components as fac -from dash import html -from dash.development.base_component import Component -from feffery_utils_components import FefferyJsonViewer - -from kz_dash.models.database import get_record_count, get_record_list -from models.task import ModelTask -from scheduler.job_manager import JobManager, TaskStatus -from kz_dash.utility.datetime_helper import format_datetime - -# ============= 状态常量 ============= -STATUS_LABELS = { - TaskStatus.PENDING: "等待中", - TaskStatus.RUNNING: "运行中", - TaskStatus.COMPLETED: "已完成", - TaskStatus.FAILED: "失败", - TaskStatus.PAUSED: "已暂停", -} - -STATUS_COLORS = { - TaskStatus.PENDING: "blue", - TaskStatus.RUNNING: "processing", - TaskStatus.COMPLETED: "success", - TaskStatus.FAILED: "error", - TaskStatus.PAUSED: "warning", -} - -# ============= 样式常量 ============= -PAGE_PADDING = 24 -TABLE_STYLES = {"marginTop": "8px", "width": "100%"} -ICON_STYLES = {"fontSize": "24px", "marginRight": "8px"} - -# ============= 任务常量 ============= -TASK_PAGE_SIZE = 100 - - -# ============= 工具函数 ============= -def get_tasks() -> List[ModelTask]: - """获取任务列表""" - return get_record_list(ModelTask, {"parent_task_id__null": True}, order_by=["-created_at"]) - - -def get_sub_tasks(task_id: str) -> List[ModelTask]: - """获取子任务列表""" - return get_record_list(ModelTask, {"parent_task_id": task_id}, order_by=["-created_at"]) - - -def get_tasks_with_pagination( - page: int = 1, page_size: int = TASK_PAGE_SIZE, filters: dict = None, sorter: dict = None -) -> Dict[str, Any]: - """获取分页任务列表 - - Args: - page: 当前页码 - page_size: 每页数量 - filters: 过滤条件 - sorter: 排序条件 - - Returns: - 包含分页数据的字典: - { - 'total': 总记录数, - 'data': 当前页数据列表 - } - """ - # 计算偏移量 - offset = (page - 1) * page_size - - # 构建查询条件 - query = {"parent_task_id__null": True} - # if filters: - # # 处理状态过滤 - # status_filter = filters.get("status") - # if status_filter: - # query["status__in"] = status_filter - - # 构建排序条件 - order_by = ["-created_at"] # 默认按创建时间倒序 - if sorter: - field = sorter.get("field") - order = sorter.get("order") - if field and order: - # 处理特殊字段 - if field == "task_id": - field = "id" - order_by = [f"{'-' if order == 'descend' else ''}{field}"] - - # 获取总记录数 - total = get_record_count(ModelTask, query) - - # 获取分页数据 - tasks = get_record_list( - ModelTask, search_fields=query, order_by=order_by, offset=offset, limit=page_size - ) - - return {"total": total, "data": [task.to_dict() for task in tasks]} - - -def prepare_task_for_display(task: Dict[str, Any]) -> Dict[str, Any]: - """将基础任务字典转换为包含UI组件的显示数据 - - 此函数接收由convert_tasks_to_dict生成的基础字典,添加用于显示的UI组件,主要用于: - 1. 添加状态标签组件(AntdTag) - 2. 添加进度条组件(AntdProgress) - 3. 添加操作按钮组件 - 4. 格式化日期时间 - - Args: - task: 基础任务字典 - - Returns: - 包含UI组件的显示数据字典 - """ - # 处理输入参数 - input_params = task.get("input_params", None) - if input_params: - try: - if isinstance(input_params, str): - input_params = json.loads(input_params) - except json.JSONDecodeError: - input_params = {"error": "无法解析的参数"} - - return { - **task, - "created_at": format_datetime(task["created_at"]) if task["created_at"] else "-", - "status_tag": create_status_tag(task), - "progress_bar": fac.AntdProgress( - percent=task["progress"], - size="small", - ), - "input_params": ( - html.Div( - [ - FefferyJsonViewer( - data=input_params, - quotesOnKeys=False, - enableClipboard=False, - displayDataTypes=False, - displayObjectSize=False, - style={ - "fontSize": "12px", - "backgroundColor": "transparent", - "padding": "0", - "textAlign": "left", - }, - collapseStringsAfterLength=False, - ), - ], - style={ - "maxHeight": "200px", - "overflow": "auto", - "textAlign": "left", - "padding": "4px", - }, - ) - if input_params - else "-" - ), - "actions": [ - {"icon": "antd-eye", "type": "link", "iconRenderer": "AntdIcon", "custom": "view"}, - { - "icon": "antd-close", - "type": "link", - "danger": True, - "iconRenderer": "AntdIcon", - "custom": "delete", - }, - { - "icon": "antd-reload", - "type": "link", - "iconRenderer": "AntdIcon", - "custom": "copy", - }, - ], - } - - -# def create_operation_buttons(task: Dict[str, Any]) -> fac.AntdSpace: -# """创建操作按钮 - -# Args: -# task: 任务数据 - -# Returns: -# 操作按钮组件 -# """ -# buttons = [ -# fac.AntdButton( -# fac.AntdIcon(icon="antd-eye"), -# type="link", -# id={ -# "type": "task-action", -# "index": task["task_id"], -# "action": "view", -# }, -# ), -# fac.AntdButton( -# fac.AntdIcon(icon="antd-reload"), -# type="link", -# id={ -# "type": "task-action", -# "index": task["task_id"], -# "action": "copy", -# }, -# ), -# ] - -# # 添加暂停/恢复按钮 -# if task["status"] in [TaskStatus.RUNNING, TaskStatus.PAUSED]: -# buttons.append( -# fac.AntdButton( -# fac.AntdIcon( -# icon=( -# "antd-pause-circle" -# if task["status"] == TaskStatus.RUNNING -# else "antd-play-circle" -# ) -# ), -# type="link", -# id={ -# "type": "task-action", -# "index": task["task_id"], -# "action": ("pause" if task["status"] == TaskStatus.RUNNING else "resume"), -# }, -# ) -# ) - -# return fac.AntdSpace(buttons) - - -def create_status_tag(task: dict, show_error: bool = True) -> Component: - """创建状态标签配置""" - if show_error and task["status"] == TaskStatus.FAILED: - return fac.AntdTooltip( - fac.AntdTag( - content="失败", - color="error", - ), - title=task.get("error") or "未知错误", - ) - else: - return fac.AntdTag( - content=STATUS_LABELS.get(task["status"], "未知"), - color=STATUS_COLORS.get(task["status"], "default"), - ) - - -def create_name_with_input_params(task: dict) -> Component: - """创建任务名称和输入参数""" - return fac.AntdPopover( - task["name"], - title="输入参数", - content=( - ( - html.Div( - [ - FefferyJsonViewer( - data=json.loads(task["input_params"]), - quotesOnKeys=False, - enableClipboard=False, - displayDataTypes=False, - displayObjectSize=False, - style={ - "fontSize": "12px", - "backgroundColor": "transparent", - "padding": "0", - "textAlign": "left", - }, - collapseStringsAfterLength=False, - ), - ], - style={ - "maxHeight": "200px", - "overflow": "auto", - "textAlign": "left", - "padding": "4px", - }, - ) - if task["input_params"] - else "-" - ), - ), - ) diff --git a/pages/transaction/modal.py b/pages/transaction/modal.py index 512ab7d..2a9f511 100644 --- a/pages/transaction/modal.py +++ b/pages/transaction/modal.py @@ -20,7 +20,7 @@ from models.database import get_transactions from models.fund import ModelFundNav from models.fund_user import ModelFundTransaction -from scheduler.job_manager import JobManager +from kz_dash.scheduler.job_manager import JobManager from kz_dash.utility.fac_helper import show_message from kz_dash.utility.string_helper import get_uuid diff --git a/scheduler/__init__.py b/scheduler/__init__.py deleted file mode 100644 index 4dbe2c5..0000000 --- a/scheduler/__init__.py +++ /dev/null @@ -1,31 +0,0 @@ -from datetime import datetime -from typing import Any, Dict, Optional - -from flask_apscheduler import APScheduler - -from scheduler.tasks import TaskStatus - -scheduler = APScheduler() - - -class TaskResult: - def __init__(self): - self.status = TaskStatus.PENDING - self.progress = 0 - self.result = None - self.error = None - self.start_time: Optional[datetime] = None - self.end_time: Optional[datetime] = None - - def to_dict(self) -> Dict[str, Any]: - return { - "status": self.status, - "progress": self.progress, - "result": self.result, - "error": self.error, - "start_time": self.start_time.isoformat() if self.start_time else None, - "end_time": self.end_time.isoformat() if self.end_time else None, - } - - -task_results = {} diff --git a/scheduler/job_manager.py b/scheduler/job_manager.py deleted file mode 100644 index 228d73c..0000000 --- a/scheduler/job_manager.py +++ /dev/null @@ -1,431 +0,0 @@ -import json -import logging -from datetime import datetime, timedelta -from typing import Dict, List - -from flask_apscheduler import APScheduler -from peewee import DatabaseError, IntegrityError - -from config import SCHEDULER_CONFIG -from kz_dash.models.database import delete_record, get_record, get_record_list, update_record -from models.task import ModelTask -from scheduler.tasks import TaskFactory, TaskStatus -from kz_dash.utility.singleton import Singleton -from kz_dash.utility.string_helper import get_uuid - -# 创建调度器实例 -scheduler = APScheduler() -logger = logging.getLogger(__name__) - - -class TaskExecutionError(Exception): - """任务执行异常""" - - -class TaskUpdateError(Exception): - """任务状态更新异常""" - - -@Singleton -class JobManager: - """任务管理器 - - 主要功能: - 1. 管理任务的生命周期(创建、执行、完成) - 2. 维护任务进度缓存,减少数据库操作 - 3. 提供任务状态查询接口 - """ - - def __init__(self): - self.scheduler = scheduler - self._progress_cache = {} # {task_id: progress} 任务进度缓存 - logger.debug("初始化任务管理器") - - def init_app(self, app): - """初始化Flask应用的任务调度器""" - app.config.update(SCHEDULER_CONFIG) - self.scheduler.init_app(app) - self.scheduler.start() - logger.info("任务调度器初始化成功") - self.restore_tasks() - - def _update_task_status(self, task: ModelTask): - """更新任务状态 - 1. 如果是普通任务,则直接任务状态设置成完成 - 2. 如果是父任务, - * 如果存在子任务,则根据子任务状态设置状态 - * 根据子任务设置错误信息 - 3. 如果存在父任务,则根据子任务状态设置父任务状态 - """ - - # 如果存在子任务,则任务状态设置成运行中 - sub_tasks = task.sub_tasks - if len(sub_tasks) > 0: - # 统计子任务状态 - sub_task_status_count = { - TaskStatus.RUNNING: 0, - TaskStatus.COMPLETED: 0, - TaskStatus.FAILED: 0, - } - for sub_task in sub_tasks: - if sub_task.status in sub_task_status_count: - sub_task_status_count[sub_task.status] += 1 - - # 计算任务进度 - # 默认进度10%为任务启动,90%为子任务进度 - task.progress = ( - sub_task_status_count[TaskStatus.COMPLETED] - + sub_task_status_count[TaskStatus.FAILED] - + sub_task_status_count[TaskStatus.RUNNING] / 2 - ) * 90 / len(task.sub_tasks) + 10 - if sub_task_status_count[TaskStatus.FAILED] > 0: - task.status = TaskStatus.FAILED - task.error = "子任务错误" - elif sub_task_status_count[TaskStatus.COMPLETED] == len(task.sub_tasks): - task.status = TaskStatus.COMPLETED - else: - task.status = TaskStatus.RUNNING - else: - task.status = TaskStatus.COMPLETED - - def _task_wrapper(self, task_type: str, task_id: str, **kwargs): - """任务执行包装器 - - 职责: - 1. 更新任务执行状态 - 2. 捕获和记录任务异常 - 3. 同步任务最终进度到数据库 - """ - try: - # 更新任务状态为运行中 - - task_history = get_record(ModelTask, {"task_id": task_id}) - - if not task_history: - logger.error("任务不存在: %s", task_id) - raise TaskExecutionError(f"任务不存在: {task_id}") - - task_history.status = TaskStatus.RUNNING - task_history.start_time = datetime.now() - - logger.info("开始执行任务: %s (ID: %s)", task_type, task_id) - - # 执行具体任务 - try: - result = TaskFactory().execute_task(task_type, task_id, **kwargs) - except (ValueError, TypeError) as e: - raise TaskExecutionError(f"任务参数错误: {str(e)}") from e - except Exception as e: - raise TaskExecutionError(f"任务执行失败: {str(e)}") from e - - # 更新任务状态 - self._update_task_status(task_history) - task_history.result = json.dumps(result, ensure_ascii=False, default=str) - - # 处理父任务 - if task_history.parent_task: - task_history.save() - self._update_task_status(task_history.parent_task) - task_history.parent_task.save() - - logger.info("任务执行更新: %s (ID: %s)->%d%%", task_type, task_id, task_history.progress) - - except TaskExecutionError as e: - # 任务执行失败 - task_history.status = TaskStatus.FAILED - task_history.error = str(e) - logger.error("任务执行失败: %s (ID: %s)", task_type, task_id, exc_info=True) - raise - - finally: - # 同步最终状态到数据库 - try: - # 优先使用缓存中的进度 - if task_id in self._progress_cache: - task_history.progress = self._progress_cache[task_id] - del self._progress_cache[task_id] - # 任务完成时设为100% - elif task_history.status == TaskStatus.COMPLETED: - task_history.progress = 100 - - task_history.end_time = datetime.now() - task_history.save() - logger.debug( - "任务最终状态: %s -> status=%s, progress=%s%%", - task_id, - task_history.status, - task_history.progress, - ) - except (DatabaseError, IntegrityError) as e: - logger.error("保存任务最终状态失败: %s", str(e)) - raise TaskUpdateError(f"保存任务状态失败: {str(e)}") from e - - def _restore_task(self, task: ModelTask, min_delay: int): - """恢复任务""" - restored_count = 0 - # 解析任务参数 - args = json.loads(task.input_params) - - # 单独处理父任务 - if len(task.sub_tasks) > 0: - # 统计子任务状态 - sub_task_status_count = { - TaskStatus.RUNNING: 0, - TaskStatus.PENDING: 0, - } - # 优先处理子任务 - for sub_task in task.sub_tasks: - if sub_task.status in [TaskStatus.RUNNING, TaskStatus.PENDING]: - restored_count += self._restore_task(sub_task, min_delay) - - # 统计子任务状态 - for sub_task in task.sub_tasks: - if sub_task.status in sub_task_status_count: - sub_task_status_count[sub_task.status] += 1 - - if sub_task_status_count[TaskStatus.PENDING] > 0: - task.status = TaskStatus.RUNNING - elif sub_task_status_count[TaskStatus.RUNNING] > 0: - task.status = TaskStatus.FAILED - task.error = "子任务超时" - else: - task.status = TaskStatus.COMPLETED - task.save() - else: - if task.status == TaskStatus.RUNNING: - # 如果任务是运行状态,则改成超时失败 - task.status = TaskStatus.FAILED - task.error = "任务超时" - - task.save() - elif task.status == TaskStatus.PENDING: - # 重新添加到调度器 - task.delay = task.delay - min_delay - if task.delay < 0: - task.delay = 0 - task.save() - self.scheduler.add_job( - func=self._task_wrapper, - args=(task.type, task.task_id), - kwargs=args, - id=task.task_id, - name=task.name, - trigger="date", - next_run_time=datetime.now() + timedelta(seconds=task.delay + 1), - misfire_grace_time=SCHEDULER_CONFIG["SCHEDULER_JOB_DEFAULTS"][ - "misfire_grace_time" - ], - ) - restored_count += 1 - logger.debug("恢复任务: %s", task.task_id) - return restored_count - - def restore_tasks(self): - """恢复任务 - - 从数据库中恢复未完成的任务 - """ - try: - # 查询所有未完成的任务 - unfinished_tasks = get_record_list( - ModelTask, {"status__in": [TaskStatus.PENDING, TaskStatus.RUNNING]} - ) - - restored_count = 0 - - # 获取延迟最小值 - min_delay = 3600 - for task in unfinished_tasks: - if task.status == TaskStatus.PENDING and task.delay > 0: - min_delay = min(min_delay, task.delay) - - for task in unfinished_tasks: - try: - restored_count += self._restore_task(task, min_delay) - - except Exception as e: - logger.error("恢复任务失败 %s: %s", task.task_id, str(e), exc_info=True) - continue - - if restored_count > 0: - logger.info("成功恢复 %d 个任务", restored_count) - - except Exception as e: - logger.error("恢复任务失败: %s", str(e), exc_info=True) - - def add_task( - self, task_type: str, delay: int = 0, timeout=0, parent_task_id=None, **kwargs - ) -> str: - """添加新任务 - 为了保证任务正常执行,延时1+delay秒执行 - - Args: - task_type: 任务类型 - delay: 延迟执行时间(秒),默认0秒. - timeout: 任务超时时间(秒),默认0,采用任务类型对应的默认超时时间 - parent_task_id: 父任务ID - **kwargs: 任务参数 - - Returns: - task_id: 新创建的任务ID - - Raises: - ValueError: 任务参数验证失败 - """ - # 验证任务参数 - is_valid, error_message = TaskFactory().validate_task_params(task_type, kwargs) - if not is_valid: - logger.error("任务参数验证失败: %s", error_message) - raise ValueError(error_message) - - task_id = get_uuid() - task_config = TaskFactory().get_task_types().get(task_type, {}) - if timeout == 0: - timeout = task_config.get("timeout", SCHEDULER_CONFIG["DEFAULT_TIMEOUT"]) - - # 创建任务记录 - update_record( - ModelTask, - {"task_id": task_id}, - { - "parent_task_id": parent_task_id, - "type": task_type, # 设置任务类型 - "name": task_config.get("name", task_type), - "delay": delay, - "status": TaskStatus.PENDING, - "timeout": timeout, - "input_params": json.dumps(kwargs), # 将参数转换为JSON字符串 - }, - ) - - # 添加到调度器立即执行 - self.scheduler.add_job( - func=self._task_wrapper, - args=(task_type, task_id), - kwargs=kwargs, - id=task_id, - name=task_config.get("name", task_type), - trigger="date", - next_run_time=datetime.now() + timedelta(seconds=delay + 1), - misfire_grace_time=SCHEDULER_CONFIG["SCHEDULER_JOB_DEFAULTS"]["misfire_grace_time"], - ) - - return task_id - - def copy_task(self, task_id: str) -> str: - """复制任务""" - try: - task = get_record(ModelTask, search_fields={"task_id": task_id}) - if not task: - logger.error("任务不存在: %s", task_id) - raise TaskExecutionError(f"任务不存在: {task_id}") - - new_task_id = get_uuid() - - args = json.loads(task.input_params) - - update_record( - ModelTask, - {"task_id": new_task_id}, - { - "type": task.type, - "name": task.name, - "delay": task.delay, - "status": TaskStatus.PENDING, - "timeout": task.timeout, - "input_params": task.input_params, - }, - ) - logger.info("任务复制成功: %s -> %s", task_id, new_task_id) - - # 添加到调度器立即执行 - self.scheduler.add_job( - func=self._task_wrapper, - args=(task.type, new_task_id), - kwargs=args, - id=new_task_id, - name=task.name, - trigger="date", - misfire_grace_time=SCHEDULER_CONFIG["SCHEDULER_JOB_DEFAULTS"]["misfire_grace_time"], - ) - return new_task_id - except (DatabaseError, IntegrityError) as e: - logger.error("复制任务失败: %s", str(e), exc_info=True) - return "" - - def update_task_progress(self, task_id: str, progress: int): - """更新任务进度到缓存""" - self._progress_cache[task_id] = progress - logger.debug("更新任务进度缓存: %s -> %d%%", task_id, progress) - - def get_task_progress(self, task_ids: List[str]) -> Dict[str, int]: - """获取多个任务的进度 - - Args: - task_ids: 任务ID列表 - - Returns: - Dict[str, int]: {task_id: progress} 进度字典 - """ - progress_dict = {} - - # 先从缓存获取 - for task_id in task_ids: - if task_id in self._progress_cache: - progress_dict[task_id] = self._progress_cache[task_id] - - # 缓存未命中的从数据库查询 - missing_task_ids = list(set(task_ids) - set(progress_dict.keys())) - if missing_task_ids: - try: - query = ModelTask.select(ModelTask.task_id, ModelTask.progress).where( - ModelTask.task_id.in_(missing_task_ids) - ) - progress_dict.update({task.task_id: task.progress for task in query.execute()}) - except (DatabaseError, IntegrityError) as e: - logger.error("从数据库获取任务进度失败: %s", str(e)) - - return progress_dict - - def _delete_task(self, task: ModelTask): - """删除指定任务及其所有子任务""" - try: - # 1. 获取任务信息 - sub_tasks = task.sub_tasks - if len(sub_tasks) > 0: - for sub_task in sub_tasks: - self._delete_task(sub_task) - - # 2. 从调度器中移除任务 - if self.scheduler.get_job(task.task_id): - try: - self.scheduler.remove_job(task.task_id) - except Exception as e: - logger.error(f"从调度器移除任务失败 {task.task_id}: {e}") - - # 3. 从数据库中删除任务 - if not delete_record(ModelTask, {"task_id": task.task_id}): - logger.error(f"从数据库删除任务失败 {task.task_id}") - raise TaskExecutionError(f"删除任务失败: {task.task_id}") - - except Exception as e: - logger.error(f"删除任务失败 {task.task_id}: {e}", exc_info=True) - raise - - def delete_task(self, task_id: str): - """删除指定任务及其所有子任务 - - Args: - task_id: 要删除的任务ID - """ - try: - task = get_record(ModelTask, {"task_id": task_id}) - if not task: - logger.error("任务不存在: %s", task_id) - raise TaskExecutionError(f"任务不存在: {task_id}") - - self._delete_task(task) - - except Exception as e: - logger.error(f"Error deleting task {task_id}: {e}") - raise e diff --git a/scheduler/tasks/__init__.py b/scheduler/tasks/__init__.py deleted file mode 100644 index e584265..0000000 --- a/scheduler/tasks/__init__.py +++ /dev/null @@ -1,35 +0,0 @@ -from scheduler.tasks.fund_nav import FundNavTask -from scheduler.tasks.sync_fund_nav import SyncFundNavTask -from scheduler.tasks.sync_fund_page import SyncFundListPageTask - -from .data_sync import DataSyncTask -from .fund_detail import FundDetailTask -from .fund_info import FundInfoTask -from .task_factory import TaskFactory - - -class TaskStatus: - """任务状态常量""" - - PENDING = "等待中" # 等待执行 - RUNNING = "运行中" # 正在执行 - COMPLETED = "已完成" # 执行完成 - FAILED = "失败" # 执行失败 - TIMEOUT = "超时" # 超时 - PAUSED = "已暂停" # 已暂停 - - -def init_tasks(): - """初始化任务类型""" - factory = TaskFactory() # 获取单例实例 - - factory.register(DataSyncTask) - # factory.register(PortfolioUpdateTask) - factory.register(FundInfoTask) - factory.register(FundDetailTask) - factory.register(FundNavTask) - factory.register(SyncFundNavTask) # 同步基金净值 - factory.register(SyncFundListPageTask) # 同步基金净值列表页面 - - -__all__ = ["TaskFactory", "TaskStatus", "init_tasks"] diff --git a/scheduler/tasks/task_factory.py b/scheduler/tasks/task_factory.py deleted file mode 100644 index d19127c..0000000 --- a/scheduler/tasks/task_factory.py +++ /dev/null @@ -1,85 +0,0 @@ -import logging -from typing import Any, Dict, Tuple, Type - -from config import DEBUG -from scheduler.tasks.base import BaseTask -from kz_dash.utility.singleton import Singleton - -logger = logging.getLogger(__name__) - - -@Singleton -class TaskFactory: - """任务工厂(单例模式)""" - - def __init__(self): - self._tasks: Dict[str, Type[BaseTask]] = {} - - def register(self, task_class: Type[BaseTask]) -> None: - """注册新的任务类型""" - task_type = task_class.get_type() - logger.info("注册任务类型: %s", task_type) - self._tasks[task_type] = task_class - - if DEBUG: - config = task_class.get_config() - logger.debug("任务配置: %s", task_type) - for key, value in config.items(): - logger.debug(" %s: %s", key, value) - - def validate_task_params(self, task_type: str, params: Dict[str, Any]) -> Tuple[bool, str]: - """验证任务参数 - - Args: - task_type: 任务类型 - params: 任务参数 - - Returns: - (is_valid, error_message): 验证结果元组 - - is_valid: 参数是否有效 - - error_message: 错误信息(验证失败时) - """ - if task_type not in self._tasks: - logger.error("未知的任务类型: %s", task_type) - return False, f"未知的任务类型: {task_type}" - - task_class = self._tasks[task_type] - return task_class.validate_params(params) - - def create(self, task_type: str, task_id: str) -> BaseTask: - """创建任务实例""" - if task_type not in self._tasks: - error_msg = f"未知的任务类型: {task_type}" - logger.error(error_msg) - raise ValueError(error_msg) - - logger.debug("创建任务实例: %s (ID: %s)", task_type, task_id) - return self._tasks[task_type](task_id) - - def execute_task(self, task_type: str, task_id: str, **kwargs) -> Dict[str, Any]: - """执行任务""" - logger.info("开始执行任务: %s (ID: %s)", task_type, task_id) - - # 先验证参数 - is_valid, error_message = self.validate_task_params(task_type, kwargs) - if not is_valid: - logger.error("任务参数验证失败: %s", error_message) - raise ValueError(error_message) - - task = self.create(task_type, task_id) - try: - result = task.execute(**kwargs) - logger.info("任务执行完成: %s (ID: %s)", task_type, task_id) - return result - except Exception as e: - logger.exception("任务执行失败: %s (ID: %s): %s", task_type, task_id, str(e)) - raise - - def get_task_types(self) -> Dict[str, Dict[str, Any]]: - """获取所有任务类型的配置""" - return {task_type: task_class.get_config() for task_type, task_class in self._tasks.items()} - - def get_available_tasks(self) -> Dict[str, Type[BaseTask]]: - """获取所有可用的任务类型""" - logger.debug("获取所有可用任务类型") - return self._tasks.copy() diff --git a/scheduler/tasks/data_sync.py b/task/data_sync.py similarity index 98% rename from scheduler/tasks/data_sync.py rename to task/data_sync.py index e3fc884..b4dcdb0 100644 --- a/scheduler/tasks/data_sync.py +++ b/task/data_sync.py @@ -3,7 +3,7 @@ from datetime import datetime from typing import Any, Dict -from .base import BaseTask +from kz_dash.scheduler.base_task import BaseTask logger = logging.getLogger(__name__) diff --git a/scheduler/tasks/fund_detail.py b/task/fund_detail.py similarity index 97% rename from scheduler/tasks/fund_detail.py rename to task/fund_detail.py index 30838d6..b793bc8 100644 --- a/scheduler/tasks/fund_detail.py +++ b/task/fund_detail.py @@ -6,7 +6,8 @@ from kz_dash.models.database import update_record from models.fund import ModelFund -from .base import PARAM_FUND_CODE, BaseTask +from task.task_config import PARAM_FUND_CODE +from kz_dash.scheduler.base_task import BaseTask logger = logging.getLogger(__name__) diff --git a/scheduler/tasks/fund_info.py b/task/fund_info.py similarity index 96% rename from scheduler/tasks/fund_info.py rename to task/fund_info.py index 316119a..13b179e 100644 --- a/scheduler/tasks/fund_info.py +++ b/task/fund_info.py @@ -4,7 +4,8 @@ from data_source.proxy import DataSourceProxy -from .base import PARAM_FUND_CODE, BaseTask +from task.task_config import PARAM_FUND_CODE +from kz_dash.scheduler.base_task import BaseTask logger = logging.getLogger(__name__) diff --git a/scheduler/tasks/fund_nav.py b/task/fund_nav.py similarity index 97% rename from scheduler/tasks/fund_nav.py rename to task/fund_nav.py index ff74ed2..a59c8c6 100644 --- a/scheduler/tasks/fund_nav.py +++ b/task/fund_nav.py @@ -4,10 +4,12 @@ from data_source.proxy import DataSourceProxy from kz_dash.models.database import update_record +from kz_dash.scheduler.base_task import BaseTask from models.fund import ModelFundNav from kz_dash.utility.datetime_helper import get_date_str_after_days, get_days_between_dates -from .base import PARAM_FUND_CODE, BaseTask +from task.task_config import PARAM_FUND_CODE + logger = logging.getLogger(__name__) diff --git a/scheduler/tasks/portfolio.py b/task/portfolio.py similarity index 100% rename from scheduler/tasks/portfolio.py rename to task/portfolio.py diff --git a/scheduler/tasks/sync_fund_list.py b/task/sync_fund_list.py similarity index 100% rename from scheduler/tasks/sync_fund_list.py rename to task/sync_fund_list.py diff --git a/scheduler/tasks/sync_fund_nav.py b/task/sync_fund_nav.py similarity index 95% rename from scheduler/tasks/sync_fund_nav.py rename to task/sync_fund_nav.py index e9c7e88..ad1fef5 100644 --- a/scheduler/tasks/sync_fund_nav.py +++ b/task/sync_fund_nav.py @@ -6,10 +6,11 @@ from data_source.proxy import DataSourceProxy from kz_dash.models.database import get_record from models.fund import ModelFund -from scheduler.tasks.task_factory import TaskFactory +from kz_dash.scheduler.task_factory import TaskFactory from kz_dash.utility.datetime_helper import get_date_str_after_days, get_days_between_dates -from .base import PARAM_FUND_CODE, PARAM_SUB_TASK_DELAY, BaseTask +from task.task_config import PARAM_FUND_CODE, PARAM_SUB_TASK_DELAY +from kz_dash.scheduler.base_task import BaseTask logger = logging.getLogger(__name__) diff --git a/scheduler/tasks/sync_fund_page.py b/task/sync_fund_page.py similarity index 96% rename from scheduler/tasks/sync_fund_page.py rename to task/sync_fund_page.py index 31213e6..5169e18 100644 --- a/scheduler/tasks/sync_fund_page.py +++ b/task/sync_fund_page.py @@ -7,13 +7,8 @@ from kz_dash.models.database import get_record, get_record_count, update_record from models.fund import ModelFund, ModelFundNav -from .base import ( - PARAM_FUND_TYPE, - PARAM_PAGE, - PARAM_PAGE_SIZE, - PARAM_SUB_TASK_DELAY, - BaseTask, -) +from task.task_config import PARAM_FUND_TYPE, PARAM_PAGE, PARAM_PAGE_SIZE, PARAM_SUB_TASK_DELAY +from kz_dash.scheduler.base_task import BaseTask logger = logging.getLogger(__name__) diff --git a/scheduler/tasks/base.py b/task/task_config.py similarity index 100% rename from scheduler/tasks/base.py rename to task/task_config.py diff --git a/task/task_init.py b/task/task_init.py new file mode 100644 index 0000000..f2ba7a8 --- /dev/null +++ b/task/task_init.py @@ -0,0 +1,20 @@ +from kz_dash.scheduler.task_factory import TaskFactory +from task.fund_nav import FundNavTask +from task.sync_fund_nav import SyncFundNavTask +from task.sync_fund_page import SyncFundListPageTask +from task.data_sync import DataSyncTask +from task.fund_detail import FundDetailTask +from task.fund_info import FundInfoTask + + +def init_tasks(): + """初始化任务类型""" + factory = TaskFactory() # 获取单例实例 + + factory.register(DataSyncTask) + # factory.register(PortfolioUpdateTask) + factory.register(FundInfoTask) + factory.register(FundDetailTask) + factory.register(FundNavTask) + factory.register(SyncFundNavTask) # 同步基金净值 + factory.register(SyncFundListPageTask) # 同步基金净值列表页面