Skip to content

Commit

Permalink
Minor fixes from PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
cloudw committed Apr 6, 2022
1 parent 21c07e5 commit 0699fff
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 31 deletions.
3 changes: 2 additions & 1 deletion metaflow/current.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from collections import namedtuple
from .flowspec import FlowSpec
import os

Parallel = namedtuple("Parallel", ["main_ip", "num_nodes", "node_index"])
Expand Down Expand Up @@ -65,7 +66,7 @@ def is_running_flow(self):
return self._is_running

@property
def flow(self):
def flow(self) -> FlowSpec:
return self._flow

@property
Expand Down
6 changes: 0 additions & 6 deletions metaflow/plugins/kfp/kfp_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,6 @@ def task_finished(
Analogous to step_functions_decorator.py
Invoked from Task.run_step within the KFP container
"""
# TODO(yunw): Is this part of code needed? - Error: 'KfpInternalDecorator' object has no attribute 'metadata'
# if self.metadata.TYPE == "local":
# # Note that the datastore is *always* Amazon S3 (see
# # runtime_task_created function).
# sync_local_metadata_to_datastore(DATASTORE_LOCAL_DIR, self.task_datastore)

if not is_task_ok:
# The task finished with an exception - execution won't
# continue so no need to do anything here.
Expand Down
6 changes: 5 additions & 1 deletion metaflow/plugins/kfp/kfp_foreach_splits.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ def get_foreach_splits(
parent_context_step_name, current_node, passed_in_split_indexes
)

# datastore version of `input_context = json.loads(self.s3.get(foreach_splits_path).text)`
foreach_splits_path = self._build_foreach_splits_prefix(
parent_context_step_name, context_node_task_id
)
Expand Down Expand Up @@ -189,10 +190,13 @@ def upload_foreach_splits_to_flow_root(self, foreach_splits: Dict):
# Only S3_datastore is supported for KFP plug-in.
# Safely assume _storage_impl is of type S3Storage here
s3_datastore: S3Storage = self.flow_datastore._storage_impl
foreach_splits_path: str = self._build_foreach_splits_prefix(
self.step_name, current.task_id
)
s3_datastore.save_bytes(
path_and_bytes_iter=[
(
self._build_foreach_splits_prefix(self.step_name, current.task_id),
foreach_splits_path,
json.dumps(foreach_splits),
)
],
Expand Down
23 changes: 0 additions & 23 deletions metaflow/plugins/kfp/tests/flows/card_flow.py

This file was deleted.

3 changes: 3 additions & 0 deletions metaflow/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ def get_username():
Return the name of the current user, or None if the current user
could not be determined.
"""
if METAFLOW_USER:
return METAFLOW_USER

# note: the order of the list matters
ENVVARS = ["METAFLOW_USER", "SUDO_USER", "USERNAME", "USER"]
for var in ENVVARS:
Expand Down

0 comments on commit 0699fff

Please sign in to comment.