Skip to content

Commit

Permalink
one commit on branch
Browse files Browse the repository at this point in the history
  • Loading branch information
savingoyal committed May 14, 2021
1 parent a0c23bd commit d634bc0
Show file tree
Hide file tree
Showing 36 changed files with 875 additions and 274 deletions.
62 changes: 40 additions & 22 deletions metaflow/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from . import current
from .util import resolve_identity, decompress_list, write_latest_run_id, \
get_latest_run_id, to_unicode
from .cli_args import cli_args
from .task import MetaflowTask
from .exception import CommandException, MetaflowException
from .graph import FlowGraph
Expand All @@ -30,6 +31,7 @@
from .monitor import Monitor
from .R import use_r, metaflow_r_version
from .mflog import mflog, LOG_SOURCES
from .unbounded_foreach import UBF_CONTROL, UBF_TASK


ERASE_TO_EOL = '\033[K'
Expand Down Expand Up @@ -411,14 +413,14 @@ def echo_unicode(line, **kwargs):
show_default=True,
help='Index of this foreach split.')
@click.option('--tag',
'tags',
'opt_tag',
multiple=True,
default=None,
help="Annotate this run with the given tag. You can specify "
"this option multiple times to attach multiple tags in "
"the task.")
@click.option('--namespace',
'user_namespace',
'opt_namespace',
default=None,
help="Change namespace from the default (your username) to "
"the specified tag.")
Expand All @@ -442,26 +444,32 @@ def echo_unicode(line, **kwargs):
help="Add a decorator to this task. You can specify this "
"option multiple times to attach multiple decorators "
"to this task.")
@click.pass_obj
def step(obj,
@click.option('--ubf-context',
default='none',
type=click.Choice(['none', UBF_CONTROL, UBF_TASK]),
help="Provides additional context if this task is of type "
"unbounded foreach.")
@click.pass_context
def step(ctx,
step_name,
tags=None,
opt_tag=None,
run_id=None,
task_id=None,
input_paths=None,
split_index=None,
user_namespace=None,
opt_namespace=None,
retry_count=None,
max_user_code_retries=None,
clone_only=None,
clone_run_id=None,
decospecs=None):
if user_namespace is not None:
namespace(user_namespace or None)
decospecs=None,
ubf_context=None):
if opt_namespace is not None:
namespace(opt_namespace or None)

func = None
try:
func = getattr(obj.flow, step_name)
func = getattr(ctx.obj.flow, step_name)
except:
raise CommandException("Step *%s* doesn't exist." % step_name)
if not func.is_step:
Expand All @@ -473,21 +481,30 @@ def step(obj,
if decospecs:
decorators._attach_decorators_to_step(func, decospecs)

obj.datastore.datastore_root = obj.datastore_root
if obj.datastore.datastore_root is None:
obj.datastore.datastore_root = \
obj.datastore.get_datastore_root_from_config(obj.echo)
ctx.obj.datastore.datastore_root = ctx.obj.datastore_root
if ctx.obj.datastore.datastore_root is None:
ctx.obj.datastore.datastore_root = \
ctx.obj.datastore.get_datastore_root_from_config(ctx.obj.echo)

obj.metadata.add_sticky_tags(tags=tags)
step_kwargs = ctx.params
# Remove argument `step_name` from `step_kwargs`.
step_kwargs.pop('step_name', None)
# Remove `opt_*` prefix from (some) option keys.
step_kwargs = dict([(k[4:], v) if k.startswith('opt_') else (k, v)
for k, v in step_kwargs.items()])
cli_args._set_step_kwargs(step_kwargs)

ctx.obj.metadata.add_sticky_tags(tags=opt_tag)
paths = decompress_list(input_paths) if input_paths else []

task = MetaflowTask(obj.flow,
obj.datastore,
obj.metadata,
obj.environment,
obj.echo,
obj.event_logger,
obj.monitor)
task = MetaflowTask(ctx.obj.flow,
ctx.obj.datastore,
ctx.obj.metadata,
ctx.obj.environment,
ctx.obj.echo,
ctx.obj.event_logger,
ctx.obj.monitor,
ubf_context)
if clone_only:
task.clone_only(step_name,
run_id,
Expand Down Expand Up @@ -834,6 +851,7 @@ def start(ctx,
branch=True)
cov.start()

cli_args._set_top_kwargs(ctx.params)
ctx.obj.echo = echo
ctx.obj.echo_always = echo_always
ctx.obj.graph = FlowGraph(ctx.obj.flow.__class__)
Expand Down
47 changes: 47 additions & 0 deletions metaflow/cli_args.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# This module provides a global singleton `cli_args` which stores the `top` and
# `step` level options for the metaflow CLI. This allows decorators to have
# access to the CLI options instead of relying (solely) on the click context.
# TODO: Fold `dict_to_cli_options` as a private method of this `CLIArgs`

from .util import dict_to_cli_options

class CLIArgs(object):
def __init__(self):
self._top_kwargs = {}
self._step_kwargs = {}

def _set_step_kwargs(self, kwargs):
self._step_kwargs = kwargs

def _set_top_kwargs(self, kwargs):
self._top_kwargs = kwargs

@property
def top_kwargs(self):
return self._top_kwargs

@property
def step_kwargs(self):
return self._step_kwargs

def step_command(self,
executable,
script,
step_name,
top_kwargs=None,
step_kwargs=None):
cmd = [executable, '-u', script]
if top_kwargs is None:
top_kwargs = self._top_kwargs
if step_kwargs is None:
step_kwargs = self._step_kwargs

top_args_list = [arg for arg in dict_to_cli_options(top_kwargs)]
cmd.extend(top_args_list)
cmd.extend(['step', step_name])
step_args_list = [arg for arg in dict_to_cli_options(step_kwargs)]
cmd.extend(step_args_list)

return cmd

cli_args = CLIArgs()
53 changes: 51 additions & 2 deletions metaflow/client/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

from metaflow.metaflow_config import DEFAULT_METADATA
from metaflow.plugins import ENVIRONMENTS, METADATA_PROVIDERS

from metaflow.unbounded_foreach import CONTROL_TASK_TAG
from metaflow.util import cached_property, resolve_identity, to_unicode

from .filecache import FileCache
Expand Down Expand Up @@ -1120,7 +1120,8 @@ def task(self):
A task in the step
"""
for t in self:
return t
if CONTROL_TASK_TAG not in t.tags:
return t

def tasks(self, *tags):
"""
Expand All @@ -1142,6 +1143,54 @@ def tasks(self, *tags):
"""
return self._filtered_children(*tags)

@property
def control_task(self):
"""
Returns a Control Task object belonging to this step.
This is useful when the step only contains one control task.
Returns
-------
Task
A control task in the step
"""
children = super(Step, self).__iter__()
for t in children:
if CONTROL_TASK_TAG in t.tags:
return t

def control_tasks(self, *tags):
"""
Returns an iterator over all the control tasks in the step.
An optional filter is available that allows you to filter on tags. The
control tasks returned if the filter is specified will contain all the
tags specified.
Parameters
----------
tags : string
Tags to match
Returns
-------
Iterator[Task]
Iterator over Control Task objects in this step
"""
children = super(Step, self).__iter__()
filter_tags = [CONTROL_TASK_TAG]
filter_tags.extend(tags)
for child in children:
if all(tag in child.tags for tag in filter_tags):
yield child

def __iter__(self):
children = super(Step, self).__iter__()
for t in children:
if CONTROL_TASK_TAG not in t.tags:
yield t

@property
def finished_at(self):
"""
Expand Down
18 changes: 14 additions & 4 deletions metaflow/datastore/util/s3util.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@
import random
import time
import sys
import os

from metaflow.exception import MetaflowException
from metaflow.metaflow_config import S3_ENDPOINT_URL, S3_VERIFY_CERTIFICATE

S3_NUM_RETRIES = 7

TEST_S3_RETRY = 'TEST_S3_RETRY' in os.environ

def get_s3_client():
from metaflow.plugins.aws.aws_client import get_aws_client
return get_aws_client(
Expand All @@ -21,7 +24,13 @@ def retry_wrapper(self, *args, **kwargs):
last_exc = None
for i in range(S3_NUM_RETRIES):
try:
return f(self, *args, **kwargs)
ret = f(self, *args, **kwargs)
if TEST_S3_RETRY and i == 0:
raise Exception("TEST_S3_RETRY env var set. "
"Pretending that an S3 op failed. "
"This is not a real failure.")
else:
return ret
except MetaflowException as ex:
# MetaflowExceptions are not related to AWS, don't retry
raise
Expand All @@ -35,7 +44,8 @@ def retry_wrapper(self, *args, **kwargs):
% (function_name, ex, S3_NUM_RETRIES - i))
self.reset_client(hard_reset=True)
last_exc = ex
# exponential backoff
time.sleep(2**i + random.randint(0, 5))
# exponential backoff for real failures
if not (TEST_S3_RETRY and i == 0):
time.sleep(2**i + random.randint(0, 5))
raise last_exc
return retry_wrapper
return retry_wrapper
21 changes: 15 additions & 6 deletions metaflow/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ class UnknownStepDecoratorException(MetaflowException):

def __init__(self, deconame):
from .plugins import STEP_DECORATORS
decos = ','.join(t.name for t in STEP_DECORATORS)
decos = ', '.join(t.name for t in STEP_DECORATORS
if not t.name.endswith('_internal'))
msg = "Unknown step decorator *{deconame}*. The following decorators are "\
"supported: *{decos}*".format(deconame=deconame, decos=decos)
super(UnknownStepDecoratorException, self).__init__(msg)
Expand All @@ -57,7 +58,7 @@ class UnknownFlowDecoratorException(MetaflowException):

def __init__(self, deconame):
from .plugins import FLOW_DECORATORS
decos = ','.join(t.name for t in FLOW_DECORATORS)
decos = ', '.join(t.name for t in FLOW_DECORATORS)
msg = "Unknown flow decorator *{deconame}*. The following decorators are "\
"supported: *{decos}*".format(deconame=deconame, decos=decos)
super(UnknownFlowDecoratorException, self).__init__(msg)
Expand Down Expand Up @@ -220,6 +221,7 @@ def step_task_retry_count(self):
Returns a tuple of (user_code_retries, error_retries). Error retries
are attempts to run the process after the user code has failed all
its retries.
Return None, None to disable all retries by the (native) runtime.
"""
return 0, 0

Expand All @@ -235,7 +237,8 @@ def runtime_task_created(self,
task_id,
split_index,
input_paths,
is_cloned):
is_cloned,
ubf_context):
"""
Called when the runtime has created a task related to this step.
"""
Expand All @@ -247,7 +250,11 @@ def runtime_finished(self, exception):
"""
pass

def runtime_step_cli(self, cli_args, retry_count, max_user_code_retries):
def runtime_step_cli(self,
cli_args,
retry_count,
max_user_code_retries,
ubf_context):
"""
Access the command line for a step execution in the runtime context.
"""
Expand All @@ -262,7 +269,8 @@ def task_pre_step(self,
flow,
graph,
retry_count,
max_user_code_retries):
max_user_code_retries,
ubf_context):
"""
Run before the step function in the task context.
"""
Expand All @@ -273,7 +281,8 @@ def task_decorate(self,
flow,
graph,
retry_count,
max_user_code_retries):
max_user_code_retries,
ubf_context):
return step_func

def task_post_step(self,
Expand Down
Loading

0 comments on commit d634bc0

Please sign in to comment.