From 8e33f9997dbff58cb622163819e9bb7d656b5dfe Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Mon, 29 Jul 2024 14:31:05 -0700 Subject: [PATCH 1/4] skip partitions Signed-off-by: Yee Hing Tong --- flytekit/core/artifact.py | 5 +++++ .../flytekit-kf-pytorch/flytekitplugins/kfpytorch/task.py | 6 ++++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/flytekit/core/artifact.py b/flytekit/core/artifact.py index 954151504f..e56643f61b 100644 --- a/flytekit/core/artifact.py +++ b/flytekit/core/artifact.py @@ -318,6 +318,9 @@ def set_reference_artifact(self, artifact: Artifact): p.reference_artifact = artifact def __getattr__(self, item): + if item == "partitions" or item == "_partitions": + print("PARTITIONS: I'm in an uninitialized state!!!!!!!!!!!!", flush=True) + raise AttributeError("Partitions in an uninitialized state, skipping partitions") if self.partitions and item in self.partitions: return self.partitions[item] raise AttributeError(f"Partition {item} not found in {self}") @@ -466,6 +469,7 @@ def my_task() -> Annotated[pd.DataFrame, RideCountData(region=Inputs.region)]: ... return RideCountData.create_from(df, time_partition=datetime.datetime.now()) """ + print(f"ARTFOMT: 1", flush=True) omt = FlyteContextManager.current_context().output_metadata_tracker additional = [card] additional.extend(args) @@ -489,6 +493,7 @@ def my_task() -> Annotated[pd.DataFrame, RideCountData(region=Inputs.region)]: additional_items=filtered_additional if filtered_additional else None, ), ) + print(f"OMT: ") return o def query( diff --git a/plugins/flytekit-kf-pytorch/flytekitplugins/kfpytorch/task.py b/plugins/flytekit-kf-pytorch/flytekitplugins/kfpytorch/task.py index ad9b5368b0..0f0af95915 100644 --- a/plugins/flytekit-kf-pytorch/flytekitplugins/kfpytorch/task.py +++ b/plugins/flytekit-kf-pytorch/flytekitplugins/kfpytorch/task.py @@ -241,7 +241,7 @@ class ElasticWorkerResult(NamedTuple): return_value: Any decks: List[flytekit.Deck] - om: OutputMetadata + om: Optional[OutputMetadata] def spawn_helper( @@ -435,7 +435,7 @@ def fn_partial(): if isinstance(e, FlyteRecoverableException): create_recoverable_error_file() raise - return ElasticWorkerResult(return_value=return_val, decks=flytekit.current_context().decks) + return ElasticWorkerResult(return_value=return_val, decks=flytekit.current_context().decks, om=None) launcher_target_func = fn_partial launcher_args = () @@ -447,10 +447,12 @@ def fn_partial(): from torch.distributed.elastic.multiprocessing.errors import ChildFailedError try: + print("Here+++++++++++++++++++++++++ 1") out = elastic_launch( config=config, entrypoint=launcher_target_func, )(*launcher_args) + print("Here+++++++++++++++++++++++++ 2") except ChildFailedError as e: _, first_failure = e.get_first_failure() if is_recoverable_worker_error(first_failure): From 0cfdf1fefe64234ba08aa1606c951e76e42396a2 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Mon, 29 Jul 2024 14:34:33 -0700 Subject: [PATCH 2/4] remove debugging Signed-off-by: Yee Hing Tong --- flytekit/core/artifact.py | 3 --- plugins/flytekit-kf-pytorch/flytekitplugins/kfpytorch/task.py | 2 -- 2 files changed, 5 deletions(-) diff --git a/flytekit/core/artifact.py b/flytekit/core/artifact.py index e56643f61b..fba84187b3 100644 --- a/flytekit/core/artifact.py +++ b/flytekit/core/artifact.py @@ -319,7 +319,6 @@ def set_reference_artifact(self, artifact: Artifact): def __getattr__(self, item): if item == "partitions" or item == "_partitions": - print("PARTITIONS: I'm in an uninitialized state!!!!!!!!!!!!", flush=True) raise AttributeError("Partitions in an uninitialized state, skipping partitions") if self.partitions and item in self.partitions: return self.partitions[item] @@ -469,7 +468,6 @@ def my_task() -> Annotated[pd.DataFrame, RideCountData(region=Inputs.region)]: ... return RideCountData.create_from(df, time_partition=datetime.datetime.now()) """ - print(f"ARTFOMT: 1", flush=True) omt = FlyteContextManager.current_context().output_metadata_tracker additional = [card] additional.extend(args) @@ -493,7 +491,6 @@ def my_task() -> Annotated[pd.DataFrame, RideCountData(region=Inputs.region)]: additional_items=filtered_additional if filtered_additional else None, ), ) - print(f"OMT: ") return o def query( diff --git a/plugins/flytekit-kf-pytorch/flytekitplugins/kfpytorch/task.py b/plugins/flytekit-kf-pytorch/flytekitplugins/kfpytorch/task.py index 0f0af95915..0236913bcb 100644 --- a/plugins/flytekit-kf-pytorch/flytekitplugins/kfpytorch/task.py +++ b/plugins/flytekit-kf-pytorch/flytekitplugins/kfpytorch/task.py @@ -447,12 +447,10 @@ def fn_partial(): from torch.distributed.elastic.multiprocessing.errors import ChildFailedError try: - print("Here+++++++++++++++++++++++++ 1") out = elastic_launch( config=config, entrypoint=launcher_target_func, )(*launcher_args) - print("Here+++++++++++++++++++++++++ 2") except ChildFailedError as e: _, first_failure = e.get_first_failure() if is_recoverable_worker_error(first_failure): From 8184211ebc8a6845b8a475276b8077e430d6cfb6 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Mon, 29 Jul 2024 14:51:27 -0700 Subject: [PATCH 3/4] add unit test Signed-off-by: Yee Hing Tong --- tests/flytekit/unit/core/test_artifacts.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/tests/flytekit/unit/core/test_artifacts.py b/tests/flytekit/unit/core/test_artifacts.py index 2eccdf52d5..9437d16add 100644 --- a/tests/flytekit/unit/core/test_artifacts.py +++ b/tests/flytekit/unit/core/test_artifacts.py @@ -619,3 +619,15 @@ def test_lims(): # test an artifact with 11 partition keys with pytest.raises(ValueError): Artifact(name="test artifact", time_partitioned=True, partition_keys=[f"key_{i}" for i in range(11)]) + + +def test_cloudpickle(): + a1_b = Artifact(name="my_data", partition_keys=["b"]) + + spec = a1_b(b="my_b_value") + import cloudpickle + + d = cloudpickle.dumps(spec) + spec2 = cloudpickle.loads(d) + + assert spec2.partitions.b.value.static_value == "my_b_value" From 1dea593b45f097fb9f5eff1aee782b3b98a5f16c Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Mon, 29 Jul 2024 15:37:59 -0700 Subject: [PATCH 4/4] Update plugins/flytekit-kf-pytorch/flytekitplugins/kfpytorch/task.py Co-authored-by: Kevin Su --- plugins/flytekit-kf-pytorch/flytekitplugins/kfpytorch/task.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/flytekit-kf-pytorch/flytekitplugins/kfpytorch/task.py b/plugins/flytekit-kf-pytorch/flytekitplugins/kfpytorch/task.py index 0236913bcb..3384c9cacc 100644 --- a/plugins/flytekit-kf-pytorch/flytekitplugins/kfpytorch/task.py +++ b/plugins/flytekit-kf-pytorch/flytekitplugins/kfpytorch/task.py @@ -241,7 +241,7 @@ class ElasticWorkerResult(NamedTuple): return_value: Any decks: List[flytekit.Deck] - om: Optional[OutputMetadata] + om: Optional[OutputMetadata] = None def spawn_helper(