Skip to content

Commit

Permalink
IncludeFile now returns the included file in the client (#607)
Browse files Browse the repository at this point in the history
* DRAFT: IncludeFile now returns the included file in the client and CLI

THIS IS NOT FINISHED; DO NOT MERGE AS IS.

* Fix the tests

* Forgot to update type check for multiple encoding
  • Loading branch information
romain-intel committed Aug 9, 2022
1 parent b0fab46 commit 7872778
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 44 deletions.
10 changes: 7 additions & 3 deletions metaflow/client/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
MetaflowNamespaceMismatch,
MetaflowInternalError,
)

from metaflow.includefile import IncludedFile
from metaflow.metaflow_config import DEFAULT_METADATA, MAX_ATTEMPTS
from metaflow.plugins import ENVIRONMENTS, METADATA_PROVIDERS
from metaflow.unbounded_foreach import CONTROL_TASK_TAG
Expand Down Expand Up @@ -886,6 +886,7 @@ def data(self):
if filecache is None:
# TODO: Pass proper environment to properly extract artifacts
filecache = FileCache()

# "create" the metadata information that the datastore needs
# to access this object.
# TODO: We can store more information in the metadata, particularly
Expand All @@ -901,12 +902,15 @@ def data(self):
},
}
if location.startswith(":root:"):
return filecache.get_artifact(ds_type, location[6:], meta, *components)
obj = filecache.get_artifact(ds_type, location[6:], meta, *components)
else:
# Older artifacts have a location information which we can use.
return filecache.get_artifact_by_location(
obj = filecache.get_artifact_by_location(
ds_type, location, meta, *components
)
if isinstance(obj, IncludedFile):
return obj.decode(self.id)
return obj

@property
def size(self):
Expand Down
55 changes: 40 additions & 15 deletions metaflow/includefile.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,31 @@ def put(self, key, obj, overwrite=True):
DATACLIENTS = {"local": Local, "s3": S3}


class IncludedFile(object):
# Thin wrapper to indicate to the MF client that this object is special
# and should be handled as an IncludedFile when returning it (ie: fetching
# the actual content)

def __init__(self, descriptor):
self._descriptor = json.dumps(descriptor)

@property
def descriptor(self):
return self._descriptor

def decode(self, name, var_type="Artifact"):
ok, file_type, err = LocalFile.is_file_handled(self._descriptor)
if not ok:
raise MetaflowException(
"%s '%s' could not be loaded: %s" % (var_type, name, err)
)
if file_type is None or isinstance(file_type, LocalFile):
raise MetaflowException(
"%s '%s' was not properly converted" % (var_type, name)
)
return file_type.load(self._descriptor)


class LocalFile:
def __init__(self, is_text, encoding, path):
self._is_text = is_text
Expand All @@ -176,7 +201,16 @@ def __init__(self, is_text, encoding, path):

@classmethod
def is_file_handled(cls, path):
# This returns a tuple:
# - True/False indicating whether the file is handled
# - None if we need to create a handler for the file, a LocalFile if
# we already know what to do with the file or a Uploader if the file
# is already present remotely (either starting with s3:// or local://)
# - An error message if file is not handled
if path:
if isinstance(path, IncludedFile):
path = path.descriptor

decoded_value = Uploader.decode_value(to_unicode(path))
if decoded_value["type"] == "self":
return (
Expand Down Expand Up @@ -338,19 +372,10 @@ def __init__(
**kwargs
)

def load_parameter(self, val):
if val is None:
return val
ok, file_type, err = LocalFile.is_file_handled(val)
if not ok:
raise MetaflowException(
"Parameter '%s' could not be loaded: %s" % (self.name, err)
)
if file_type is None or isinstance(file_type, LocalFile):
raise MetaflowException(
"Parameter '%s' was not properly converted" % self.name
)
return file_type.load(val)
def load_parameter(self, v):
if v is None:
return v
return v.decode(self.name, var_type="Parameter")


class Uploader:
Expand All @@ -363,11 +388,11 @@ def __init__(self, client_class):
@staticmethod
def encode_url(url_type, url, **kwargs):
# Avoid encoding twice (default -> URL -> _convert method of FilePath for example)
if url is None or len(url) == 0 or url[0] == "{":
if url is None or isinstance(url, IncludedFile):
return url
return_value = {"type": url_type, "url": url}
return_value.update(kwargs)
return json.dumps(return_value)
return IncludedFile(return_value)

@staticmethod
def decode_value(value):
Expand Down
3 changes: 3 additions & 0 deletions test/core/metaflow_test/cli_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from collections import namedtuple
from tempfile import NamedTemporaryFile

from metaflow.includefile import IncludedFile
from metaflow.util import is_stringish

from . import (
Expand Down Expand Up @@ -43,6 +44,8 @@ def assert_artifact(self, step, name, value, fields=None):
for field, v in fields.items():
if is_stringish(artifact):
data = json.loads(artifact)
elif isinstance(artifact, IncludedFile):
data = json.loads(artifact.descriptor)
else:
data = artifact
if not isinstance(data, dict):
Expand Down
1 change: 0 additions & 1 deletion test/core/metaflow_test/metadata_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ def __init__(self, flow):
def _test_namespace(self):
from metaflow.client import Flow, get_namespace, namespace, default_namespace
from metaflow.exception import MetaflowNamespaceMismatch
import os

# test 1) METAFLOW_USER should be the default
assert_equals("user:%s" % os.environ.get("METAFLOW_USER"), get_namespace())
Expand Down
70 changes: 45 additions & 25 deletions test/core/tests/basic_include.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,28 +42,48 @@ def step_all(self):
pass

def check_results(self, flow, checker):
for step in flow:
checker.assert_artifact(
step.name,
"myfile_txt",
None,
fields={"type": "uploader-v1", "is_text": True, "encoding": None},
)
checker.assert_artifact(
step.name,
"myfile_utf8",
None,
fields={"type": "uploader-v1", "is_text": True, "encoding": "utf8"},
)
checker.assert_artifact(
step.name,
"myfile_binary",
None,
fields={"type": "uploader-v1", "is_text": False, "encoding": None},
)
checker.assert_artifact(
step.name,
"myfile_overriden",
None,
fields={"type": "uploader-v1", "is_text": True, "encoding": None},
)
run = checker.get_run()
if run is None:
# CliChecker does not return a run object; we check to make sure
# the returned value is the blob describing the artifact
# (this may be improved in the future)
for step in flow:
checker.assert_artifact(
step.name,
"myfile_txt",
None,
fields={"type": "uploader-v1", "is_text": True, "encoding": None},
)
checker.assert_artifact(
step.name,
"myfile_utf8",
None,
fields={"type": "uploader-v1", "is_text": True, "encoding": "utf8"},
)
checker.assert_artifact(
step.name,
"myfile_binary",
None,
fields={"type": "uploader-v1", "is_text": False, "encoding": None},
)
checker.assert_artifact(
step.name,
"myfile_overriden",
None,
fields={"type": "uploader-v1", "is_text": True, "encoding": None},
)
else:
# In the case of the client, we check the value.
for step in flow:
checker.assert_artifact(step.name, "myfile_txt", "Regular Text File")
checker.assert_artifact(
step.name, "myfile_utf8", u"UTF Text File \u5e74"
)
checker.assert_artifact(
step.name,
"myfile_binary",
u"UTF Text File \u5e74".encode(encoding="utf8"),
)
checker.assert_artifact(
step.name, "myfile_overriden", "Override Text File"
)

0 comments on commit 7872778

Please sign in to comment.