Skip to content

Commit

Permalink
复制任务
Browse files Browse the repository at this point in the history
  • Loading branch information
kingzeus committed Nov 25, 2024
1 parent dfbf87b commit c99c585
Show file tree
Hide file tree
Showing 15 changed files with 81 additions and 40 deletions.
12 changes: 3 additions & 9 deletions backend/apis/account.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
import uuid

from flask_restx import Namespace, Resource, fields

from backend.apis.common import create_list_response_model, create_response_model
from models.account import ModelAccount, update_account
from models.database import (
delete_record,
get_record,
get_record_list,
update_record,
)
from models.database import delete_record, get_record, get_record_list, update_record
from utils.response import format_response

api = Namespace("accounts", description="账户相关操作")
Expand Down Expand Up @@ -54,9 +50,7 @@ def post(self):
"""创建新账户"""
data = api.payload
account_id = update_account(str(uuid.uuid4()), data)
return format_response(
data=get_record(ModelAccount, {"id": account_id}), message="账户创建成功"
)
return format_response(data=get_record(ModelAccount, {"id": account_id}), message="账户创建成功")


@api.route("/<string:account_id>")
Expand Down
8 changes: 2 additions & 6 deletions backend/apis/portfolio.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
import uuid

from flask_restx import Namespace, Resource, fields

from backend.apis.common import create_list_response_model, create_response_model
from models.account import ModelPortfolio
from models.database import (
delete_record,
get_record,
get_record_list,
update_record,
)
from models.database import delete_record, get_record, get_record_list, update_record
from utils.response import format_response

api = Namespace("portfolios", description="投资组合相关操作")
Expand Down
3 changes: 2 additions & 1 deletion models/account.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import Any, Dict, Optional
import uuid
from typing import Any, Dict, Optional

from peewee import BooleanField, CharField, ForeignKeyField

from models.base import BaseModel
Expand Down
11 changes: 3 additions & 8 deletions models/database.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
from collections.abc import Callable
import logging
import uuid
from collections.abc import Callable
from datetime import datetime
from typing import Any, Dict, List, Optional

from peewee import JOIN, fn
from playhouse.shortcuts import update_model_from_dict


from utils.datetime_helper import format_datetime

from .account import ModelAccount, ModelPortfolio
Expand Down Expand Up @@ -531,13 +530,9 @@ def delete_record(
if callback_before:
callback_before(record)
record.delete_instance()
logger.info(
"成功删除记录 - 模型: %s, 条件: %s", model_class.__name__, str(search_fields)
)
logger.info("成功删除记录 - 模型: %s, 条件: %s", model_class.__name__, str(search_fields))
return True
logger.warning(
"未找到要删除的记录 - 模型: %s, 条件: %s", model_class.__name__, str(search_fields)
)
logger.warning("未找到要删除的记录 - 模型: %s, 条件: %s", model_class.__name__, str(search_fields))
return False
except Exception as e:
logger.error(
Expand Down
1 change: 0 additions & 1 deletion pages/account/account_modal.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,6 @@ def handle_account_create_or_edit(
- 账户描述输入框值
"""
if ok_counts and name and validate_status == "success":

update_account(
editing_id,
{"name": name, "description": description},
Expand Down
2 changes: 1 addition & 1 deletion pages/account/portfolio_modal.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from typing import Optional, Tuple
import uuid
from typing import Optional, Tuple

import feffery_antd_components as fac
from dash import Input, Output, State, callback, dcc
Expand Down
2 changes: 1 addition & 1 deletion pages/task/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
- table.py: 任务列表表格相关
- modal.py: 任务创建弹窗及其回调
- detail_modal.py: 任务详情弹窗相关
- utils.py: 通用工具函数和常量
- task_utils.py: 通用工具函数和常量
"""

from pages.task.task_page import render_task_page
Expand Down
2 changes: 1 addition & 1 deletion pages/task/detail_modal.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from dash import Input, Output, State, callback
from dash.exceptions import PreventUpdate

from pages.task.utils import get_task_detail_items
from pages.task.task_utils import get_task_detail_items


def render_task_detail_modal() -> fac.AntdModal:
Expand Down
8 changes: 3 additions & 5 deletions pages/task/task_page.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#
# 页面结构:
# 1. Store组件
# - task-store: 存储任务数据
# - task-store: 存储任务数据, task数据模型转成dict
# - viewing-task-id: 存储正在查看的任务ID
# - task-loading-state: 存储任务加载状态
#
Expand All @@ -24,8 +24,7 @@
from pages.task.detail_modal import render_task_detail_modal
from pages.task.modal import render_task_modal
from pages.task.task_table import render_task_table
from pages.task.utils import ICON_STYLES, PAGE_PADDING
from scheduler.job_manager import JobManager
from pages.task.task_utils import ICON_STYLES, PAGE_PADDING, get_task_store_data


def render_task_page() -> html.Div:
Expand All @@ -34,8 +33,7 @@ def render_task_page() -> html.Div:
Returns:
包含完整页面结构的Div组件
"""
tasks = JobManager().get_task_history()
initial_tasks = [task.to_dict() for task in tasks]
initial_tasks = get_task_store_data()

return html.Div(
[
Expand Down
15 changes: 11 additions & 4 deletions pages/task/task_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,19 @@

import json
import logging
from copy import deepcopy
from typing import Any, Dict, List

import dash
import feffery_antd_components as fac
from dash import ALL, Input, Output, State, callback
from dash.exceptions import PreventUpdate

from pages.task.utils import TABLE_STYLES, prepare_task_for_display
from pages.task.task_utils import (
TABLE_STYLES,
get_task_store_data,
prepare_task_for_display,
)
from scheduler.job_manager import JobManager, TaskStatus

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -138,10 +143,12 @@ def handle_task_action(action_clicks, current_data):
JobManager().pause_task(task_id)
else:
JobManager().resume_task(task_id)
elif action == "copy":
logger.info("复制任务: %s", task_id)
JobManager().copy_task(task_id)

# 获取最新任务列表
tasks = JobManager().get_task_history()
return tasks
# 获取最新任务列表
return get_task_store_data()

except Exception as e:
logger.error("处理任务操作失败: %s", str(e))
Expand Down
15 changes: 14 additions & 1 deletion pages/task/utils.py → pages/task/task_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from dash import html
from feffery_utils_components import FefferyJsonViewer

from scheduler.job_manager import TaskStatus
from scheduler.job_manager import JobManager, TaskStatus
from utils.datetime_helper import format_datetime

# ============= 状态常量 =============
Expand All @@ -40,6 +40,10 @@


# ============= 工具函数 =============
def get_task_store_data() -> List[Dict[str, Any]]:
"""获取任务store数据"""
tasks = JobManager().get_task_history()
return [task.to_dict() for task in tasks]


def prepare_task_for_display(task: Dict[str, Any]) -> Dict[str, Any]:
Expand Down Expand Up @@ -126,6 +130,15 @@ def create_operation_buttons(task: Dict[str, Any]) -> fac.AntdSpace:
"action": "view",
},
),
fac.AntdButton(
fac.AntdIcon(icon="antd-reload"),
type="link",
id={
"type": "task-action",
"index": task["task_id"],
"action": "copy",
},
),
]

# 添加暂停/恢复按钮
Expand Down
36 changes: 36 additions & 0 deletions scheduler/job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,42 @@ def add_task(self, task_type: str, **kwargs) -> str:

return task_id

def copy_task(self, task_id: str) -> str:
"""复制任务"""
try:
task = ModelTask.get(ModelTask.task_id == task_id)
new_task_id = self._create_job_id()

args = json.loads(task.input_params)
args["priority"] = task.priority
args["timeout"] = task.timeout

ModelTask.create(
task_id=new_task_id,
type=task.type,
name=task.name,
priority=task.priority,
status=TaskStatus.PENDING,
timeout=task.timeout,
input_params=task.input_params,
)
logger.info("任务复制成功: %s -> %s", task_id, new_task_id)

# 添加到调度器立即执行
self.scheduler.add_job(
func=self._task_wrapper,
args=(task.type, new_task_id),
kwargs=args,
id=new_task_id,
name=task.name,
trigger="date",
misfire_grace_time=SCHEDULER_CONFIG["SCHEDULER_JOB_DEFAULTS"]["misfire_grace_time"],
)
return new_task_id
except (DatabaseError, IntegrityError) as e:
logger.error("复制任务失败: %s", str(e), exc_info=True)
return ""

def pause_task(self, task_id: str) -> bool:
"""暂停指定任务"""
try:
Expand Down
1 change: 1 addition & 0 deletions scheduler/tasks/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from scheduler.tasks.fund_nav import FundNavTask

from .data_sync import DataSyncTask
from .fund_detail import FundDetailTask
from .fund_info import FundInfoTask
Expand Down
1 change: 0 additions & 1 deletion scheduler/tasks/fund_nav.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from datetime import datetime
from typing import Any, Dict


from data_source.proxy import DataSourceProxy
from models.database import update_record
from models.fund import ModelFundNav
Expand Down
4 changes: 3 additions & 1 deletion utils/datetime_helper.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from datetime import date, datetime, timedelta
import logging
from datetime import date, datetime, timedelta
from typing import Optional, Union

logger = logging.getLogger(__name__)


def format_datetime(
dt: Union[str, datetime, None],
output_format: str = "%Y-%m-%d %H:%M",
Expand Down

0 comments on commit c99c585

Please sign in to comment.