From 7872778a56d518d3abff87eda34ad8139d453446 Mon Sep 17 00:00:00 2001 From: Romain Date: Thu, 29 Jul 2021 15:51:10 -0700 Subject: [PATCH] IncludeFile now returns the included file in the client (#607) * DRAFT: IncludeFile now returns the included file in the client and CLI THIS IS NOT FINISHED; DO NOT MERGE AS IS. * Fix the tests * Forgot to update type check for multiple encoding --- metaflow/client/core.py | 10 +++- metaflow/includefile.py | 55 +++++++++++++----- test/core/metaflow_test/cli_check.py | 3 + test/core/metaflow_test/metadata_check.py | 1 - test/core/tests/basic_include.py | 70 +++++++++++++++-------- 5 files changed, 95 insertions(+), 44 deletions(-) diff --git a/metaflow/client/core.py b/metaflow/client/core.py index 9657bef2574..f4c9d5a5977 100644 --- a/metaflow/client/core.py +++ b/metaflow/client/core.py @@ -14,7 +14,7 @@ MetaflowNamespaceMismatch, MetaflowInternalError, ) - +from metaflow.includefile import IncludedFile from metaflow.metaflow_config import DEFAULT_METADATA, MAX_ATTEMPTS from metaflow.plugins import ENVIRONMENTS, METADATA_PROVIDERS from metaflow.unbounded_foreach import CONTROL_TASK_TAG @@ -886,6 +886,7 @@ def data(self): if filecache is None: # TODO: Pass proper environment to properly extract artifacts filecache = FileCache() + # "create" the metadata information that the datastore needs # to access this object. # TODO: We can store more information in the metadata, particularly @@ -901,12 +902,15 @@ def data(self): }, } if location.startswith(":root:"): - return filecache.get_artifact(ds_type, location[6:], meta, *components) + obj = filecache.get_artifact(ds_type, location[6:], meta, *components) else: # Older artifacts have a location information which we can use. - return filecache.get_artifact_by_location( + obj = filecache.get_artifact_by_location( ds_type, location, meta, *components ) + if isinstance(obj, IncludedFile): + return obj.decode(self.id) + return obj @property def size(self): diff --git a/metaflow/includefile.py b/metaflow/includefile.py index b3b18ccf0fa..f1af49f3dab 100644 --- a/metaflow/includefile.py +++ b/metaflow/includefile.py @@ -168,6 +168,31 @@ def put(self, key, obj, overwrite=True): DATACLIENTS = {"local": Local, "s3": S3} +class IncludedFile(object): + # Thin wrapper to indicate to the MF client that this object is special + # and should be handled as an IncludedFile when returning it (ie: fetching + # the actual content) + + def __init__(self, descriptor): + self._descriptor = json.dumps(descriptor) + + @property + def descriptor(self): + return self._descriptor + + def decode(self, name, var_type="Artifact"): + ok, file_type, err = LocalFile.is_file_handled(self._descriptor) + if not ok: + raise MetaflowException( + "%s '%s' could not be loaded: %s" % (var_type, name, err) + ) + if file_type is None or isinstance(file_type, LocalFile): + raise MetaflowException( + "%s '%s' was not properly converted" % (var_type, name) + ) + return file_type.load(self._descriptor) + + class LocalFile: def __init__(self, is_text, encoding, path): self._is_text = is_text @@ -176,7 +201,16 @@ def __init__(self, is_text, encoding, path): @classmethod def is_file_handled(cls, path): + # This returns a tuple: + # - True/False indicating whether the file is handled + # - None if we need to create a handler for the file, a LocalFile if + # we already know what to do with the file or a Uploader if the file + # is already present remotely (either starting with s3:// or local://) + # - An error message if file is not handled if path: + if isinstance(path, IncludedFile): + path = path.descriptor + decoded_value = Uploader.decode_value(to_unicode(path)) if decoded_value["type"] == "self": return ( @@ -338,19 +372,10 @@ def __init__( **kwargs ) - def load_parameter(self, val): - if val is None: - return val - ok, file_type, err = LocalFile.is_file_handled(val) - if not ok: - raise MetaflowException( - "Parameter '%s' could not be loaded: %s" % (self.name, err) - ) - if file_type is None or isinstance(file_type, LocalFile): - raise MetaflowException( - "Parameter '%s' was not properly converted" % self.name - ) - return file_type.load(val) + def load_parameter(self, v): + if v is None: + return v + return v.decode(self.name, var_type="Parameter") class Uploader: @@ -363,11 +388,11 @@ def __init__(self, client_class): @staticmethod def encode_url(url_type, url, **kwargs): # Avoid encoding twice (default -> URL -> _convert method of FilePath for example) - if url is None or len(url) == 0 or url[0] == "{": + if url is None or isinstance(url, IncludedFile): return url return_value = {"type": url_type, "url": url} return_value.update(kwargs) - return json.dumps(return_value) + return IncludedFile(return_value) @staticmethod def decode_value(value): diff --git a/test/core/metaflow_test/cli_check.py b/test/core/metaflow_test/cli_check.py index e558add8fb2..f6edc00a3a3 100644 --- a/test/core/metaflow_test/cli_check.py +++ b/test/core/metaflow_test/cli_check.py @@ -4,6 +4,7 @@ from collections import namedtuple from tempfile import NamedTemporaryFile +from metaflow.includefile import IncludedFile from metaflow.util import is_stringish from . import ( @@ -43,6 +44,8 @@ def assert_artifact(self, step, name, value, fields=None): for field, v in fields.items(): if is_stringish(artifact): data = json.loads(artifact) + elif isinstance(artifact, IncludedFile): + data = json.loads(artifact.descriptor) else: data = artifact if not isinstance(data, dict): diff --git a/test/core/metaflow_test/metadata_check.py b/test/core/metaflow_test/metadata_check.py index 24c014ded34..b536d2ce331 100644 --- a/test/core/metaflow_test/metadata_check.py +++ b/test/core/metaflow_test/metadata_check.py @@ -27,7 +27,6 @@ def __init__(self, flow): def _test_namespace(self): from metaflow.client import Flow, get_namespace, namespace, default_namespace from metaflow.exception import MetaflowNamespaceMismatch - import os # test 1) METAFLOW_USER should be the default assert_equals("user:%s" % os.environ.get("METAFLOW_USER"), get_namespace()) diff --git a/test/core/tests/basic_include.py b/test/core/tests/basic_include.py index e79bf653f7f..2df503538b1 100644 --- a/test/core/tests/basic_include.py +++ b/test/core/tests/basic_include.py @@ -42,28 +42,48 @@ def step_all(self): pass def check_results(self, flow, checker): - for step in flow: - checker.assert_artifact( - step.name, - "myfile_txt", - None, - fields={"type": "uploader-v1", "is_text": True, "encoding": None}, - ) - checker.assert_artifact( - step.name, - "myfile_utf8", - None, - fields={"type": "uploader-v1", "is_text": True, "encoding": "utf8"}, - ) - checker.assert_artifact( - step.name, - "myfile_binary", - None, - fields={"type": "uploader-v1", "is_text": False, "encoding": None}, - ) - checker.assert_artifact( - step.name, - "myfile_overriden", - None, - fields={"type": "uploader-v1", "is_text": True, "encoding": None}, - ) + run = checker.get_run() + if run is None: + # CliChecker does not return a run object; we check to make sure + # the returned value is the blob describing the artifact + # (this may be improved in the future) + for step in flow: + checker.assert_artifact( + step.name, + "myfile_txt", + None, + fields={"type": "uploader-v1", "is_text": True, "encoding": None}, + ) + checker.assert_artifact( + step.name, + "myfile_utf8", + None, + fields={"type": "uploader-v1", "is_text": True, "encoding": "utf8"}, + ) + checker.assert_artifact( + step.name, + "myfile_binary", + None, + fields={"type": "uploader-v1", "is_text": False, "encoding": None}, + ) + checker.assert_artifact( + step.name, + "myfile_overriden", + None, + fields={"type": "uploader-v1", "is_text": True, "encoding": None}, + ) + else: + # In the case of the client, we check the value. + for step in flow: + checker.assert_artifact(step.name, "myfile_txt", "Regular Text File") + checker.assert_artifact( + step.name, "myfile_utf8", u"UTF Text File \u5e74" + ) + checker.assert_artifact( + step.name, + "myfile_binary", + u"UTF Text File \u5e74".encode(encoding="utf8"), + ) + checker.assert_artifact( + step.name, "myfile_overriden", "Override Text File" + )