diff --git a/src/dflow/__init__.py b/src/dflow/__init__.py index ffa2a6e0..28e5c233 100644 --- a/src/dflow/__init__.py +++ b/src/dflow/__init__.py @@ -5,6 +5,7 @@ from .code_gen import gen_code from .common import (CustomArtifact, LineageClient, LocalArtifact, S3Artifact, import_func) +from .common import jsonpickle from .config import config, s3_config, set_config, set_s3_config from .context import Context from .dag import DAG @@ -44,7 +45,7 @@ "LineageClient", "Secret", "query_workflows", "query_archived_workflows", "ContainerExecutor", "ArgoStep", "ArgoWorkflow", "argo_enumerate", "path_object_of_artifact", - "CustomArtifact", "gen_code"] + "CustomArtifact", "gen_code", "jsonpickle"] if os.environ.get("DFLOW_LINEAGE"): diff --git a/src/dflow/argo_objects.py b/src/dflow/argo_objects.py index e5e6c6a0..c847ceb6 100644 --- a/src/dflow/argo_objects.py +++ b/src/dflow/argo_objects.py @@ -8,8 +8,7 @@ from copy import deepcopy from typing import Any, List, Union -import jsonpickle - +from .common import jsonpickle from .config import config, s3_config from .io import S3Artifact from .op_template import get_k8s_client diff --git a/src/dflow/code_gen.py b/src/dflow/code_gen.py index 97189ac3..7a7dd20d 100644 --- a/src/dflow/code_gen.py +++ b/src/dflow/code_gen.py @@ -1,13 +1,12 @@ import inspect import json -import jsonpickle - from .common import (input_artifact_pattern, input_parameter_pattern, step_output_artifact_pattern, step_output_parameter_pattern, task_output_artifact_pattern, task_output_parameter_pattern) +from .common import jsonpickle from .config import config from .dag import DAG from .io import InputArtifact, InputParameter, OutputArtifact, OutputParameter @@ -118,7 +117,7 @@ def render_steps(self, var_name, template): "%s.outputs.parameters['%s']" % ( step_dict[match.group(1)], match.group(2))) elif isinstance(par, dict) and "py/object" in par: - self.imports.add((None, "jsonpickle")) + self.imports.add(("dflow", "jsonpickle")) kwargs["parameters"][name] = Variable( "jsonpickle.loads(%s)" % repr(json.dumps(par))) for name, art in kwargs.get("artifacts", {}).items(): @@ -254,7 +253,7 @@ def render_dag(self, var_name, template): task_dict[match.group(1)], match.group(2))) dependencies_dict[task["name"]].append(match.group(1)) elif isinstance(par, dict) and "py/object" in par: - self.imports.add((None, "jsonpickle")) + self.imports.add(("dflow", "jsonpickle")) kwargs["parameters"][name] = Variable( "jsonpickle.loads(%s)" % repr(json.dumps(par))) for name, art in kwargs.get("artifacts", {}).items(): diff --git a/src/dflow/common.py b/src/dflow/common.py index d498201d..f228ece6 100644 --- a/src/dflow/common.py +++ b/src/dflow/common.py @@ -8,7 +8,7 @@ from importlib import import_module from typing import Any, Dict, List, Union -import jsonpickle +import jsonpickle as jp from .config import config as global_config from .config import s3_config @@ -45,7 +45,7 @@ r"^{{tasks\.(.*?)\.outputs\.artifacts\.(.*?)}}$") -class CustomHandler(jsonpickle.handlers.BaseHandler): +class CustomHandler(jp.handlers.BaseHandler): def flatten(self, obj, data): data.update(obj.to_dict()) return data @@ -253,3 +253,16 @@ def download(self, name: str, path: str): def render(self, template, name: str): return template + + +class CustomPickler: + __path__ = jp.__path__ + + def dumps(self, obj, **kwargs): + return jp.dumps(obj, keys=True, **kwargs) + + def loads(self, s, **kwargs): + return jp.loads(s, keys=True, **kwargs) + + +jsonpickle = CustomPickler() diff --git a/src/dflow/io.py b/src/dflow/io.py index 8848bc99..80e9a3fc 100644 --- a/src/dflow/io.py +++ b/src/dflow/io.py @@ -4,10 +4,8 @@ from copy import copy, deepcopy from typing import Any, Dict, List, Optional, Union -import jsonpickle - -from .common import (CustomArtifact, LocalArtifact, S3Artifact, param_errmsg, - param_regex) +from .common import (CustomArtifact, LocalArtifact, S3Artifact, jsonpickle, + param_errmsg, param_regex) from .config import config from .utils import randstr, s3_config, upload_s3 diff --git a/src/dflow/python/opio.py b/src/dflow/python/opio.py index de9b1ec6..c5acff51 100644 --- a/src/dflow/python/opio.py +++ b/src/dflow/python/opio.py @@ -3,9 +3,7 @@ from pathlib import Path from typing import Any, Dict, List, Optional, Set, Union -import jsonpickle - -from ..common import CustomHandler, S3Artifact +from ..common import CustomHandler, S3Artifact, jsonpickle from ..config import config from ..io import PVC, type_to_str diff --git a/src/dflow/python/python_op_template.py b/src/dflow/python/python_op_template.py index 509b12f4..3bd32282 100644 --- a/src/dflow/python/python_op_template.py +++ b/src/dflow/python/python_op_template.py @@ -4,10 +4,8 @@ from pathlib import Path from typing import Any, Dict, List, Optional, Type, Union -import jsonpickle - from .. import __path__ -from ..common import S3Artifact +from ..common import S3Artifact, jsonpickle from ..config import config from ..io import (PVC, InputArtifact, InputParameter, Inputs, OutputArtifact, OutputParameter, Outputs) @@ -449,8 +447,8 @@ def render_script(self): script += handle_packages_script( "%s/inputs/artifacts/dflow_python_packages" % self.tmp_root) - script += "import json, jsonpickle\n" - script += "from dflow import config, s3_config\n" + script += "import json\n" + script += "from dflow import config, jsonpickle, s3_config\n" script += "config.update(jsonpickle.loads(r'''%s'''))\n" % \ jsonpickle.dumps(config) script += "s3_config.update(jsonpickle.loads(r'''%s'''))\n" % \ @@ -483,7 +481,7 @@ def render_script(self): script += "%s = cloudpickle.loads(%s)\n" % \ (class_name, cloudpickle.dumps(op_class)) - script += "import os, sys, traceback, jsonpickle\n" + script += "import os, sys, traceback\n" script += "from dflow.python import OPIO, TransientError, FatalError\n" script += "from dflow.python.utils import handle_input_artifact," \ " handle_input_parameter\n" diff --git a/src/dflow/python/utils.py b/src/dflow/python/utils.py index 788970f1..702a347a 100644 --- a/src/dflow/python/utils.py +++ b/src/dflow/python/utils.py @@ -5,8 +5,7 @@ from pathlib import Path from typing import Dict, List, Set -import jsonpickle - +from ..common import jsonpickle from ..config import config from ..utils import (artifact_classes, assemble_path_object, convert_dflow_list, copy_file, expand, flatten, randstr, diff --git a/src/dflow/step.py b/src/dflow/step.py index e2659340..2b59fc2d 100644 --- a/src/dflow/step.py +++ b/src/dflow/step.py @@ -10,10 +10,9 @@ from copy import copy, deepcopy from typing import Any, Dict, List, Optional, Union -import jsonpickle - from .common import (CustomArtifact, HTTPArtifact, LocalArtifact, S3Artifact, - field_errmsg, field_regex, key_errmsg, key_regex) + field_errmsg, field_regex, jsonpickle, key_errmsg, + key_regex) from .config import config, s3_config from .context_syntax import GLOBAL_CONTEXT from .executor import Executor diff --git a/src/dflow/util_ops.py b/src/dflow/util_ops.py index a12daedb..fd81d36a 100644 --- a/src/dflow/util_ops.py +++ b/src/dflow/util_ops.py @@ -1,7 +1,5 @@ -import jsonpickle - from . import __path__ -from .common import S3Artifact +from .common import S3Artifact, jsonpickle from .config import config from .io import (InputArtifact, InputParameter, Inputs, OutputArtifact, OutputParameter) diff --git a/src/dflow/utils.py b/src/dflow/utils.py index db08d2b3..e9902236 100644 --- a/src/dflow/utils.py +++ b/src/dflow/utils.py @@ -21,9 +21,7 @@ from pathlib import Path, PosixPath, WindowsPath from typing import Dict, List, Optional, Set, Tuple, Union -import jsonpickle - -from .common import LocalArtifact, S3Artifact +from .common import LocalArtifact, S3Artifact, jsonpickle from .config import config, s3_config try: diff --git a/src/dflow/workflow.py b/src/dflow/workflow.py index 5550a461..9b3bb45b 100644 --- a/src/dflow/workflow.py +++ b/src/dflow/workflow.py @@ -6,10 +6,8 @@ from copy import deepcopy from typing import Any, Dict, List, Optional, Union -import jsonpickle - from .argo_objects import ArgoStep, ArgoWorkflow -from .common import subdomain_errmsg, subdomain_regex +from .common import jsonpickle, subdomain_errmsg, subdomain_regex from .config import config, s3_config from .context import Context from .context_syntax import GLOBAL_CONTEXT