diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 7e92f6d6e3a..c56f629f5da 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -7,12 +7,9 @@ on: pull_request: branches: - master -<<<<<<< HEAD - feature/kfp -======= workflow_call: ->>>>>>> master jobs: pre-commit: runs-on: ubuntu-latest @@ -28,73 +25,23 @@ jobs: fail-fast: false matrix: os: [ubuntu-latest, macos-latest] -<<<<<<< HEAD - lang: [Python] -======= - ver: ['3.5', '3.6', '3.7', '3.8', '3.9','3.10',] ->>>>>>> master + ver: ['3.9','3.10',] steps: - uses: actions/checkout@v2 -<<<<<<< HEAD - - name: Set up Python 3.9 - uses: actions/setup-python@v2 - with: - python-version: '3.9' -======= - name: Set up Python uses: actions/setup-python@v2 with: python-version: ${{ matrix.ver }} ->>>>>>> master - name: Install Python ${{ matrix.ver }} dependencies run: | python3 -m pip install --upgrade pip -<<<<<<< HEAD python3 -m pip install tox numpy black - name: Python Code Format Check run: black --target-version py39 --diff --check ./metaflow/plugins/kfp/*.py -======= - python3 -m pip install tox numpy - name: Execute Python tests run: tox - - R: - name: core / R ${{ matrix.ver }} on ${{ matrix.os }} - runs-on: ${{ matrix.os }} - strategy: - fail-fast: false - matrix: - os: [ubuntu-latest, macos-latest] - ver: ['4.0', '4.1'] - - steps: - - uses: actions/checkout@v2 - - name: Set up ${{ matrix.ver }} - uses: r-lib/actions/setup-r@v2 - with: - r-version: ${{ matrix.ver }} - - - name: Install R ${{ matrix.ver }} system dependencies - if: matrix.os == 'ubuntu-latest' - run: sudo apt-get update; sudo apt-get install -y libcurl4-openssl-dev qpdf libgit2-dev - - - name: Install R ${{ matrix.ver }} Rlang dependencies - run: | - python3 -m pip install . - Rscript -e 'install.packages("devtools", repos="https://cloud.r-project.org", Ncpus=8)' - Rscript -e 'devtools::install_deps("R", dependencies=TRUE, repos="https://cloud.r-project.org", upgrade="default")' - R CMD INSTALL R - Rscript -e 'install.packages(c("data.table", "caret", "glmnet", "Matrix", "rjson"), repos="https://cloud.r-project.org", Ncpus=8)' - - - name: Execute R tests - run: | - cd R/tests - Rscript run_tests.R ->>>>>>> master - - diff --git a/metaflow/cli.py b/metaflow/cli.py index 3fbf4698f4d..3f18d9c2948 100644 --- a/metaflow/cli.py +++ b/metaflow/cli.py @@ -28,12 +28,6 @@ from .runtime import NativeRuntime from .package import MetaflowPackage -<<<<<<< HEAD -from .plugins import ENVIRONMENTS, LOGGING_SIDECARS, METADATA_PROVIDERS, MONITOR_SIDECARS -from .metaflow_config import DEFAULT_DATASTORE, DEFAULT_ENVIRONMENT, DEFAULT_EVENT_LOGGER, \ - DEFAULT_METADATA, DEFAULT_MONITOR, DEFAULT_PACKAGE_SUFFIXES, \ - METAFLOW_COVERAGE_OMIT, METAFLOW_COVERAGE_SOURCE -======= from .plugins import ( ENVIRONMENTS, LOGGING_SIDECARS, @@ -48,7 +42,6 @@ DEFAULT_MONITOR, DEFAULT_PACKAGE_SUFFIXES, ) ->>>>>>> master from .metaflow_environment import MetaflowEnvironment from .pylint_wrapper import PyLint from .event_logger import EventLogger @@ -58,16 +51,9 @@ from .unbounded_foreach import UBF_CONTROL, UBF_TASK -<<<<<<< HEAD - -ERASE_TO_EOL = '\033[K' -HIGHLIGHT = 'red' -INDENT = ' ' * 4 -======= ERASE_TO_EOL = "\033[K" HIGHLIGHT = "red" INDENT = " " * 4 ->>>>>>> master LOGGER_TIMESTAMP = "magenta" LOGGER_COLOR = "green" @@ -518,15 +504,6 @@ def step( if opt_namespace is not None: namespace(opt_namespace or None) - if ctx.obj.coverage: - from coverage import Coverage - cov = Coverage(data_suffix=True, - auto_data=True, - source=METAFLOW_COVERAGE_SOURCE.split(","), - omit=METAFLOW_COVERAGE_OMIT.split(",") if METAFLOW_COVERAGE_OMIT else None, - branch=True) - cov.start() - func = None try: func = getattr(ctx.obj.flow, step_name) @@ -534,14 +511,7 @@ def step( raise CommandException("Step *%s* doesn't exist." % step_name) if not func.is_step: raise CommandException("Function *%s* is not a step." % step_name) -<<<<<<< HEAD - echo('Executing a step, *%s*' % step_name, - fg='magenta', - bold=False, - err=False) -======= echo("Executing a step, *%s*" % step_name, fg="magenta", bold=False) ->>>>>>> master if decospecs: decorators._attach_decorators_to_step(func, decospecs) @@ -584,12 +554,6 @@ def step( echo("Success", fg="green", bold=True, indent=True) -<<<<<<< HEAD - echo('Success', fg='green', bold=True, indent=True, err=False) - if ctx.obj.coverage: - cov.stop() -======= ->>>>>>> master @parameters.add_custom_parameters(deploy_mode=False) @cli.command(help="Internal command to initialize a run.") @@ -948,27 +912,11 @@ def start( if use_r(): version = metaflow_r_version() -<<<<<<< HEAD - echo('Metaflow %s' % version, fg='magenta', bold=True, nl=False) - echo(" executing *%s*" % ctx.obj.flow.name, fg='magenta', nl=False) - echo(" for *%s*" % resolve_identity(), fg='magenta') - - if coverage: - from coverage import Coverage - cov = Coverage(data_suffix=True, - auto_data=True, - source=METAFLOW_COVERAGE_SOURCE.split(","), - omit=METAFLOW_COVERAGE_OMIT.split(",") if METAFLOW_COVERAGE_OMIT else None, - branch=True) - cov.start() -======= echo("Metaflow %s" % version, fg="magenta", bold=True, nl=False) echo(" executing *%s*" % ctx.obj.flow.name, fg="magenta", nl=False) echo(" for *%s*" % resolve_identity(), fg="magenta") ->>>>>>> master cli_args._set_top_kwargs(ctx.params) - ctx.obj.coverage = coverage ctx.obj.echo = echo ctx.obj.echo_always = echo_always ctx.obj.graph = FlowGraph(ctx.obj.flow.__class__) @@ -1054,9 +1002,6 @@ def start( if ctx.invoked_subcommand is None: ctx.invoke(check) - if coverage: - cov.stop() - def _reconstruct_cli(params): for k, v in params.items(): diff --git a/metaflow/current.py b/metaflow/current.py index d4b11c72cf8..1336351c4c3 100644 --- a/metaflow/current.py +++ b/metaflow/current.py @@ -1,16 +1,12 @@ -<<<<<<< HEAD -from metaflow import FlowSpec - -======= from collections import namedtuple import os Parallel = namedtuple("Parallel", ["main_ip", "num_nodes", "node_index"]) ->>>>>>> master class Current(object): def __init__(self): + self._flow = None self._flow_name = None self._run_id = None self._step_name = None @@ -21,22 +17,6 @@ def __init__(self): self._username = None self._is_running = False -<<<<<<< HEAD - def _set_env(self, - flow=None, - flow_name=None, - run_id=None, - step_name=None, - task_id=None, - retry_count=None, - origin_run_id=None, - namespace=None, - username=None, - is_running=True): - - self._flow = flow - self._flow_name = flow_name -======= def _raise(ex): raise ex @@ -57,10 +37,10 @@ def _set_env( is_running=True, ): if flow is not None: + self._flow = flow self._flow_name = flow.name self.__class__.graph = property(fget=lambda _, flow=flow: flow._graph_info) ->>>>>>> master self._run_id = run_id self._step_name = step_name self._task_id = task_id @@ -85,7 +65,7 @@ def is_running_flow(self): return self._is_running @property - def flow(self) -> FlowSpec: + def flow(self): return self._flow @property diff --git a/metaflow/metaflow_config.py b/metaflow/metaflow_config.py index 57444a2260f..04e2e9922d8 100644 --- a/metaflow/metaflow_config.py +++ b/metaflow/metaflow_config.py @@ -52,20 +52,20 @@ def from_conf(name, default=None): DEFAULT_PACKAGE_SUFFIXES = from_conf("METAFLOW_DEFAULT_PACKAGE_SUFFIXES", ".py,.R,.RDS") DEFAULT_AWS_CLIENT_PROVIDER = from_conf("METAFLOW_DEFAULT_AWS_CLIENT_PROVIDER", "boto3") -METAFLOW_USER = from_conf('METAFLOW_USER') +METAFLOW_USER = from_conf("METAFLOW_USER") ## # KFP configuration ### -KFP_SDK_NAMESPACE = from_conf('KFP_SDK_NAMESPACE', 'kubeflow') -KFP_SDK_API_NAMESPACE = from_conf('KFP_SDK_API_NAMESPACE', 'kubeflow') -KFP_TTL_SECONDS_AFTER_FINISHED = from_conf('KFP_TTL_SECONDS_AFTER_FINISHED', None) -KFP_USER_DOMAIN = from_conf('KFP_USER_DOMAIN', '') +KFP_SDK_NAMESPACE = from_conf("KFP_SDK_NAMESPACE", "kubeflow") +KFP_SDK_API_NAMESPACE = from_conf("KFP_SDK_API_NAMESPACE", "kubeflow") +KFP_TTL_SECONDS_AFTER_FINISHED = from_conf("KFP_TTL_SECONDS_AFTER_FINISHED", None) +KFP_USER_DOMAIN = from_conf("KFP_USER_DOMAIN", "") # Note: `KFP_RUN_URL_PREFIX` is the URL prefix for KFP runs on your KFP cluster. The prefix includes # all parts of the URL except the run_id at the end which we append once the run is created. # For eg, this would look like: "https:///pipeline/#/runs/details/" -KFP_RUN_URL_PREFIX = from_conf('KFP_RUN_URL_PREFIX', "") -KFP_MAX_PARALLELISM = int(from_conf('KFP_MAX_PARALLELISM', 10)) +KFP_RUN_URL_PREFIX = from_conf("KFP_RUN_URL_PREFIX", "") +KFP_MAX_PARALLELISM = int(from_conf("KFP_MAX_PARALLELISM", 10)) ### # Datastore configuration @@ -85,15 +85,6 @@ def from_conf(name, default=None): ) # Local datatools root location DATATOOLS_LOCALROOT = from_conf( -<<<<<<< HEAD - 'METAFLOW_DATATOOLS_LOCALROOT', - '%s/%s' % (from_conf('METAFLOW_DATASTORE_SYSROOT_LOCAL'), DATATOOLS_SUFFIX) - if from_conf('METAFLOW_DATASTORE_SYSROOT_LOCAL') else None) - -# S3 endpoint url -S3_ENDPOINT_URL = from_conf('METAFLOW_S3_ENDPOINT_URL', None) -S3_VERIFY_CERTIFICATE = from_conf('METAFLOW_S3_VERIFY_CERTIFICATE', None) -======= "METAFLOW_DATATOOLS_LOCALROOT", os.path.join(from_conf("METAFLOW_DATASTORE_SYSROOT_LOCAL"), DATATOOLS_SUFFIX) if from_conf("METAFLOW_DATASTORE_SYSROOT_LOCAL") @@ -120,7 +111,6 @@ def from_conf(name, default=None): # though as this may increase failures. Note that this is the number of *retries* # so setting it to 0 means each operation will be tried once. S3_RETRY_COUNT = int(from_conf("METAFLOW_S3_RETRY_COUNT", 7)) ->>>>>>> master ### # Datastore local cache @@ -227,10 +217,6 @@ def from_conf(name, default=None): ### # Conda package root location on S3 CONDA_PACKAGE_S3ROOT = from_conf( -<<<<<<< HEAD - 'METAFLOW_CONDA_PACKAGE_S3ROOT', - '%s/conda' % from_conf('METAFLOW_DATASTORE_SYSROOT_S3')) -======= "METAFLOW_CONDA_PACKAGE_S3ROOT", "%s/conda" % from_conf("METAFLOW_DATASTORE_SYSROOT_S3"), ) @@ -239,7 +225,6 @@ def from_conf(name, default=None): # Mamba promises faster package dependency resolution times, which # should result in an appreciable speedup in flow environment initialization. CONDA_DEPENDENCY_RESOLVER = from_conf("METAFLOW_CONDA_DEPENDENCY_RESOLVER", "conda") ->>>>>>> master ### # Debug configuration @@ -275,9 +260,6 @@ def from_conf(name, default=None): METADATA_SERVICE_HEADERS["x-api-key"] = AWS_SANDBOX_API_KEY SFN_STATE_MACHINE_PREFIX = from_conf("METAFLOW_AWS_SANDBOX_STACK_NAME") -METAFLOW_COVERAGE_SOURCE = from_conf("METAFLOW_COVERAGE_SOURCE", "metaflow") -METAFLOW_COVERAGE_OMIT = from_conf("METAFLOW_COVERAGE_OMIT") - # MAX_ATTEMPTS is the maximum number of attempts, including the first # task, retries, and the final fallback task and its retries. @@ -312,25 +294,6 @@ def get_version(pkg): # PINNED_CONDA_LIBS are the libraries that metaflow depends on for execution # and are needed within a conda environment def get_pinned_conda_libs(python_version): -<<<<<<< HEAD - if python_version.startswith("3.5"): - return { - 'click': '7.1.2', - 'requests': '2.24.0', - 'boto3': '1.9.88', - 'coverage': '4.5.1' - } - else: - return { - 'click': '7.1.2', - 'requests': '2.24.0', - 'boto3': '1.14.47', - 'coverage': '4.5.4' - } - - -# Check if there is a an extension to Metaflow to load and override everything -======= return { "requests": ">=2.21.0", "boto3": ">=1.14.0", @@ -340,7 +303,6 @@ def get_pinned_conda_libs(python_version): METAFLOW_EXTENSIONS_ADDL_SUFFIXES = set([]) # Check if there are extensions to Metaflow to load and override everything ->>>>>>> master try: from metaflow.extension_support import get_modules diff --git a/metaflow/metaflow_environment.py b/metaflow/metaflow_environment.py index 8c88dd1780d..586ea7a740a 100644 --- a/metaflow/metaflow_environment.py +++ b/metaflow/metaflow_environment.py @@ -2,19 +2,11 @@ import platform import sys -<<<<<<< HEAD -from .metaflow_config import from_conf -from .util import get_username, to_unicode -from . import metaflow_version -from metaflow.exception import MetaflowException -from metaflow.mflog import BASH_MFLOG, BASH_MFLOG_KFP -======= from .util import get_username from . import metaflow_version from metaflow.exception import MetaflowException from metaflow.extension_support import dump_module_info -from metaflow.mflog import BASH_MFLOG ->>>>>>> master +from metaflow.mflog import BASH_MFLOG, BASH_MFLOG_KFP from . import R version_cache = None @@ -87,69 +79,48 @@ def get_client_info(cls, flow_name, metadata): """ return "Local environment" -<<<<<<< HEAD def get_boto3_copy_command(self, s3_path, local_path, command="download_file"): if command == "download_file": copy_command = ( - "boto3.client('s3')" - ".download_file(parsed.netloc, parsed.path.lstrip('/'), '%s')" % local_path + "boto3.client('s3')" + ".download_file(parsed.netloc, parsed.path.lstrip('/'), '%s')" + % local_path ) elif command == "upload_file": copy_command = ( "boto3.client('s3')" - ".upload_file('%s', parsed.netloc, parsed.path.lstrip('/'))" % local_path + ".upload_file('%s', parsed.netloc, parsed.path.lstrip('/'))" + % local_path ) else: raise ValueError("%s not supported" % command) return ( - "%s -c \"import boto3; " % self._python() + '%s -c "import boto3; ' % self._python() + "exec('try:\\n from urlparse import urlparse\\nexcept:\\n from urllib.parse import " - "urlparse'); " + "urlparse'); " + "parsed = urlparse('%s'); " % s3_path - + "%s\"" % copy_command + + '%s"' % copy_command ) def get_package_commands( - self, - code_package_url, - is_kfp_plugin=False, + self, + code_package_url, + is_kfp_plugin=False, ): mflog_bash_cmd = BASH_MFLOG if not is_kfp_plugin else BASH_MFLOG_KFP cmds = [ - mflog_bash_cmd, - "mflog \'Setting up task environment.\'", - "%s -m pip install click requests boto3 -qqq" % self._python(), - "mkdir metaflow", - "cd metaflow", - "mkdir .metaflow", # mute local datastore creation log - "i=0; while [ $i -le 5 ]; do " - "mflog \'Downloading code package...\'; " - "%s && \ - mflog \'Code package downloaded.\' && break; " - "sleep 10; i=$((i+1)); " - "done" % self.get_boto3_copy_command(code_package_url, "job.tar"), - "if [ $i -gt 5 ]; then " - "mflog \'Failed to download code package from %s " - "after 6 tries. Exiting...\' && exit 1; " - "fi" % code_package_url, - "tar xf job.tar", - ] -======= - def get_package_commands(self, code_package_url): - cmds = [ - BASH_MFLOG, + mflog_bash_cmd, "mflog 'Setting up task environment.'", - "%s -m pip install awscli requests boto3 -qqq" % self._python(), + "%s -m pip install requests boto3 -qqq" % self._python(), "mkdir metaflow", "cd metaflow", "mkdir .metaflow", # mute local datastore creation log "i=0; while [ $i -le 5 ]; do " "mflog 'Downloading code package...'; " - "%s -m awscli s3 cp %s job.tar >/dev/null && \ - mflog 'Code package downloaded.' && break; " + "%s && mflog 'Code package downloaded.' && break; " "sleep 10; i=$((i+1)); " - "done" % (self._python(), code_package_url), + "done" % self.get_boto3_copy_command(code_package_url, "job.tar"), "if [ $i -gt 5 ]; then " "mflog 'Failed to download code package from %s " "after 6 tries. Exiting...' && exit 1; " @@ -157,7 +128,6 @@ def get_package_commands(self, code_package_url): "TAR_OPTIONS='--warning=no-timestamp' tar xf job.tar", "mflog 'Task is starting.'", ] ->>>>>>> master return cmds def get_environment_info(self): diff --git a/metaflow/mflog/__init__.py b/metaflow/mflog/__init__.py index b64103a7a33..b988038ba48 100644 --- a/metaflow/mflog/__init__.py +++ b/metaflow/mflog/__init__.py @@ -40,22 +40,19 @@ " }" % TASK_LOG_SOURCE ) -<<<<<<< HEAD # In KFP, the same bash function goes through # - quotation marks escape adjustment for proper concatenation -BASH_MFLOG_KFP =\ - 'mflog(){ '\ - 'T=$(date -u -Ins|tr , .); '\ - 'echo \"[MFLOG|0|${T:0:26}Z|%s|$T]$1\"'\ - ' >> $MFLOG_STDOUT; echo $1; '\ - ' }' % TASK_LOG_SOURCE - -BASH_SAVE_LOGS_ARGS = ['python', '-m', 'metaflow.mflog.save_logs'] -BASH_SAVE_LOGS = ' '.join(BASH_SAVE_LOGS_ARGS) -======= +BASH_MFLOG_KFP = ( + "mflog(){ " + "T=$(date -u -Ins|tr , .); " + 'echo "[MFLOG|0|${T:0:26}Z|%s|$T]$1"' + " >> $MFLOG_STDOUT; echo $1; " + " }" % TASK_LOG_SOURCE +) + BASH_SAVE_LOGS_ARGS = ["python", "-m", "metaflow.mflog.save_logs"] BASH_SAVE_LOGS = " ".join(BASH_SAVE_LOGS_ARGS) ->>>>>>> master + # this function returns a bash expression that redirects stdout # and stderr of the given command to mflog diff --git a/metaflow/plugins/__init__.py b/metaflow/plugins/__init__.py index 58e9bc94912..de893da37b7 100644 --- a/metaflow/plugins/__init__.py +++ b/metaflow/plugins/__init__.py @@ -79,25 +79,17 @@ def get_plugin_cli(): from .aws.batch import batch_cli from .aws.eks import kubernetes_cli from .aws.step_functions import step_functions_cli -<<<<<<< HEAD from .kfp import kfp_cli -======= from .cards import card_cli ->>>>>>> master return _ext_plugins["get_plugin_cli"]() + [ package_cli.cli, batch_cli.cli, -<<<<<<< HEAD - step_functions_cli.cli, - kfp_cli.cli] - -======= card_cli.cli, + kfp_cli.cli, kubernetes_cli.cli, step_functions_cli.cli, ] ->>>>>>> master # Add new decorators in this list @@ -108,27 +100,6 @@ def get_plugin_cli(): from .retry_decorator import RetryDecorator from .resources_decorator import ResourcesDecorator from .aws.batch.batch_decorator import BatchDecorator -<<<<<<< HEAD -from .aws.step_functions.step_functions_decorator import StepFunctionsInternalDecorator -from .test_unbounded_foreach_decorator\ - import InternalTestUnboundedForeachDecorator, InternalTestUnboundedForeachInput -from .conda.conda_step_decorator import CondaStepDecorator -from .kfp.kfp_decorator import KfpInternalDecorator -from .kfp.accelerator_decorator import AcceleratorDecorator -from .kfp.s3_sensor_decorator import S3SensorDecorator - -STEP_DECORATORS = _merge_lists([CatchDecorator, - TimeoutDecorator, - EnvironmentDecorator, - ResourcesDecorator, - RetryDecorator, - BatchDecorator, - StepFunctionsInternalDecorator, - CondaStepDecorator, - InternalTestUnboundedForeachDecorator, - AcceleratorDecorator, - KfpInternalDecorator], _ext_plugins.STEP_DECORATORS, 'name') -======= from .aws.eks.kubernetes_decorator import KubernetesDecorator from .aws.step_functions.step_functions_decorator import StepFunctionsInternalDecorator from .test_unbounded_foreach_decorator import ( @@ -138,6 +109,8 @@ def get_plugin_cli(): from .conda.conda_step_decorator import CondaStepDecorator from .cards.card_decorator import CardDecorator from .frameworks.pytorch import PytorchParallelDecorator +from .kfp.kfp_decorator import KfpInternalDecorator +from .kfp.accelerator_decorator import AcceleratorDecorator STEP_DECORATORS = [ @@ -154,9 +127,10 @@ def get_plugin_cli(): ParallelDecorator, PytorchParallelDecorator, InternalTestUnboundedForeachDecorator, + AcceleratorDecorator, + KfpInternalDecorator, ] _merge_lists(STEP_DECORATORS, _ext_plugins["STEP_DECORATORS"], "name") ->>>>>>> master # Add Conda environment from .conda.conda_environment import CondaEnvironment @@ -177,17 +151,14 @@ def get_plugin_cli(): from .conda.conda_flow_decorator import CondaFlowDecorator from .aws.step_functions.schedule_decorator import ScheduleDecorator from .project_decorator import ProjectDecorator -<<<<<<< HEAD -FLOW_DECORATORS = _merge_lists([CondaFlowDecorator, - ScheduleDecorator, - S3SensorDecorator, - ProjectDecorator], - _ext_plugins.FLOW_DECORATORS, 'name') - -======= ->>>>>>> master +from .kfp.s3_sensor_decorator import S3SensorDecorator -FLOW_DECORATORS = [CondaFlowDecorator, ScheduleDecorator, ProjectDecorator] +FLOW_DECORATORS = [ + CondaFlowDecorator, + ScheduleDecorator, + ProjectDecorator, + S3SensorDecorator, +] _merge_lists(FLOW_DECORATORS, _ext_plugins["FLOW_DECORATORS"], "name") # Cards diff --git a/metaflow/plugins/aws/batch/batch_decorator.py b/metaflow/plugins/aws/batch/batch_decorator.py index 2092c9f0dbb..1e6ff193f13 100644 --- a/metaflow/plugins/aws/batch/batch_decorator.py +++ b/metaflow/plugins/aws/batch/batch_decorator.py @@ -4,11 +4,9 @@ import requests import time -from metaflow import util from metaflow import R, current from metaflow.decorators import StepDecorator -from metaflow.plugins import ResourcesDecorator from metaflow.plugins.timeout_decorator import get_run_time_limit_for_task from metaflow.metadata import MetaDatum from metaflow.metadata.util import sync_local_metadata_to_datastore @@ -23,19 +21,8 @@ from metaflow.sidecar import SidecarSubProcess from metaflow.unbounded_foreach import UBF_CONTROL -<<<<<<< HEAD -from metaflow.plugins.resources_decorator import ResourcesDecorator - -try: - # python2 - from urlparse import urlparse -except: # noqa E722 - # python3 - from urllib.parse import urlparse -======= from .batch import BatchException from ..aws_utils import compute_resource_attributes, get_docker_registry ->>>>>>> master class BatchDecorator(StepDecorator): diff --git a/metaflow/plugins/conda/conda_step_decorator.py b/metaflow/plugins/conda/conda_step_decorator.py index 51847f30fcc..0bd1b7ec623 100644 --- a/metaflow/plugins/conda/conda_step_decorator.py +++ b/metaflow/plugins/conda/conda_step_decorator.py @@ -226,11 +226,7 @@ def _disable_safety_checks(self, decos): def _architecture(self, decos): for deco in decos: -<<<<<<< HEAD - if deco.name == 'batch' or deco.name == 'kfp_internal': -======= - if deco.name in ("batch", "kubernetes"): ->>>>>>> master + if deco.name in ("batch", "kfp_internal", "kubernetes"): # force conda resolution for linux-64 architectures return "linux-64" bit = "32" diff --git a/metaflow/plugins/environment_decorator.py b/metaflow/plugins/environment_decorator.py index 43b8788b7d0..04d714877a2 100644 --- a/metaflow/plugins/environment_decorator.py +++ b/metaflow/plugins/environment_decorator.py @@ -38,14 +38,9 @@ def myStep(self): def my_step(self): ... """ -<<<<<<< HEAD - name = 'environment' - defaults = {'vars': {}, 'kubernetes_vars': None} -======= ->>>>>>> master name = "environment" - defaults = {"vars": {}} + defaults = {"vars": {}, "kubernetes_vars": None} def runtime_step_cli( self, cli_args, retry_count, max_user_code_retries, ubf_context diff --git a/metaflow/plugins/kfp/accelerator_decorator.py b/metaflow/plugins/kfp/accelerator_decorator.py index ddf28d9a431..b4637644620 100644 --- a/metaflow/plugins/kfp/accelerator_decorator.py +++ b/metaflow/plugins/kfp/accelerator_decorator.py @@ -38,6 +38,8 @@ def train(self): "type": None, } - def step_init(self, flow, graph, step, decos, environment, datastore, logger): + def step_init( + self, flow, graph, step_name, decorators, environment, flow_datastore, logger + ): if not self.attributes["type"]: raise MetaflowException("You must specify the type of accelerator.") diff --git a/metaflow/plugins/kfp/kfp.py b/metaflow/plugins/kfp/kfp.py index 64d0d0b85c8..25e8feccc28 100644 --- a/metaflow/plugins/kfp/kfp.py +++ b/metaflow/plugins/kfp/kfp.py @@ -6,28 +6,27 @@ import sys from dataclasses import dataclass from pathlib import Path -from typing import Callable, Dict, List, Optional, Tuple, Union, Any +from typing import Any, Callable, Dict, List, Optional, Tuple, Union import kfp from kfp import dsl -from kfp.dsl import ContainerOp, PipelineConf -from kfp.dsl import PipelineVolume, ResourceOp -from kfp.dsl._container_op import _get_resource_number, _get_cpu_number +from kfp.dsl import ContainerOp, PipelineConf, PipelineVolume, ResourceOp +from kfp.dsl._container_op import _get_cpu_number, _get_resource_number from kfp.dsl._pipeline_param import sanitize_k8s_name from kubernetes.client import ( + V1Affinity, V1EnvVar, V1EnvVarSource, - V1ObjectFieldSelector, - V1ResourceRequirements, - V1PersistentVolumeClaimSpec, - V1OwnerReference, - V1ObjectMeta, - V1PersistentVolumeClaim, - V1Affinity, V1NodeAffinity, V1NodeSelector, - V1NodeSelectorTerm, V1NodeSelectorRequirement, + V1NodeSelectorTerm, + V1ObjectFieldSelector, + V1ObjectMeta, + V1OwnerReference, + V1PersistentVolumeClaim, + V1PersistentVolumeClaimSpec, + V1ResourceRequirements, V1Toleration, ) @@ -35,20 +34,21 @@ from metaflow.metaflow_config import ( DATASTORE_SYSROOT_S3, KFP_TTL_SECONDS_AFTER_FINISHED, - METAFLOW_USER, KFP_USER_DOMAIN, + METAFLOW_USER, from_conf, ) -from metaflow.plugins import KfpInternalDecorator, EnvironmentDecorator -from metaflow.plugins.kfp.kfp_decorator import KfpException +from metaflow.plugins import EnvironmentDecorator, KfpInternalDecorator from metaflow.plugins.kfp.kfp_constants import S3_SENSOR_RETRY_COUNT -from .accelerator_decorator import AcceleratorDecorator -from .kfp_foreach_splits import graph_to_task_ids, KfpForEachSplits -from ..aws.batch.batch_decorator import BatchDecorator -from ..aws.step_functions.schedule_decorator import ScheduleDecorator +from metaflow.plugins.kfp.kfp_decorator import KfpException + from ...graph import DAGNode from ...metaflow_environment import MetaflowEnvironment from ...plugins.resources_decorator import ResourcesDecorator +from ..aws.batch.batch_decorator import BatchDecorator +from ..aws.step_functions.schedule_decorator import ScheduleDecorator +from .accelerator_decorator import AcceleratorDecorator +from .kfp_foreach_splits import KfpForEachSplits, graph_to_task_ids # TODO: @schedule UNSUPPORTED_DECORATORS = ( @@ -127,7 +127,7 @@ def __init__( code_package, code_package_url, metadata, - datastore, + flow_datastore, environment, event_logger, monitor, @@ -155,7 +155,7 @@ def __init__( self.code_package = code_package self.code_package_url = code_package_url self.metadata = metadata - self.datastore = datastore + self.flow_datastore = flow_datastore self.environment = environment self.event_logger = event_logger self.monitor = monitor diff --git a/metaflow/plugins/kfp/kfp_cli.py b/metaflow/plugins/kfp/kfp_cli.py index 178e6702d7b..2e5e73f25ba 100644 --- a/metaflow/plugins/kfp/kfp_cli.py +++ b/metaflow/plugins/kfp/kfp_cli.py @@ -2,15 +2,13 @@ import shutil import subprocess -import click - -from metaflow import current, decorators, parameters, JSONType -from metaflow.datastore.datastore import TransformableObject +from metaflow import JSONType, current, decorators, parameters +from metaflow._vendor import click from metaflow.exception import CommandException, MetaflowException from metaflow.metaflow_config import ( + KFP_MAX_PARALLELISM, KFP_SDK_API_NAMESPACE, KFP_SDK_NAMESPACE, - KFP_MAX_PARALLELISM, from_conf, ) from metaflow.package import MetaflowPackage @@ -48,7 +46,7 @@ def kubeflow_pipelines(obj): @click.pass_obj def step_init(obj, run_id, step_name, passed_in_split_indexes, task_id): save_step_environment_variables( - obj.datastore, + obj.flow_datastore, obj.graph, run_id, step_name, @@ -291,7 +289,7 @@ def _convert_value(param: parameters.Parameter): ) ) else: - if s3_code_package and flow.datastore.TYPE != "s3": + if s3_code_package and obj.flow_datastore.TYPE != "s3": raise CommandException( "Kubeflow Pipelines s3-code-package requires --datastore=s3." ) @@ -301,20 +299,15 @@ def _convert_value(param: parameters.Parameter): bold=True, ) run_pipeline_result = flow.create_run_on_kfp(run_name, flow_parameters) + kfp_run_id = run_pipeline_result.run_id + kfp_run_url = run_id_to_url(kfp_run_id) + metaflow_run_id = f"kfp-{kfp_run_id}" obj.echo("\nRun created successfully!\n") + obj.echo(f"Metaflow run_id=*{metaflow_run_id}* \n", fg="magenta") + obj.echo(f"*Run link:* {kfp_run_url}\n", fg="cyan") - run_id = f"kfp-{run_pipeline_result.run_id}" - obj.echo(f"Metaflow run_id=*{run_id}* \n", fg="magenta") - - kfp_run_url = run_id_to_url(run_pipeline_result.run_id) - - obj.echo( - "*Run link:* {kfp_run_url}\n".format(kfp_run_url=kfp_run_url), - fg="cyan", - ) - - run_info = flow._client.get_run(run_pipeline_result.run_id) + run_info = flow._client.get_run(kfp_run_id) workflow_manifest = json.loads(run_info.pipeline_runtime.workflow_manifest) argo_workflow_name = workflow_manifest["metadata"]["name"] @@ -335,13 +328,13 @@ def _convert_value(param: parameters.Parameter): cmd, shell=True, stdout=subprocess.PIPE, encoding="utf8" ) succeeded = "Succeeded" in ret.stdout - show_status(run_id, kfp_run_url, obj.echo, succeeded) + show_status(metaflow_run_id, kfp_run_url, obj.echo, succeeded) elif wait_for_completion: response = flow._client.wait_for_run_completion( - run_pipeline_result.run_id, timeout=wait_for_completion_timeout + kfp_run_id, timeout=wait_for_completion_timeout ) succeeded = response.run.status == "Succeeded" - show_status(run_id, kfp_run_url, obj.echo, succeeded) + show_status(metaflow_run_id, kfp_run_url, obj.echo, succeeded) def show_status(run_id: str, kfp_run_url: str, echo: callable, succeeded: bool): @@ -383,7 +376,7 @@ def make_flow( # Attach KFP decorator to the flow decorators._attach_decorators(obj.flow, [KfpInternalDecorator.name]) decorators._init_step_decorators( - obj.flow, obj.graph, obj.environment, obj.datastore, obj.logger + obj.flow, obj.graph, obj.environment, obj.flow_datastore, obj.logger ) obj.package = MetaflowPackage( @@ -392,18 +385,11 @@ def make_flow( package_url = None if s3_code_package: - datastore = obj.datastore( - obj.flow.name, - mode="w", - metadata=obj.metadata, - event_logger=obj.event_logger, - monitor=obj.monitor, - ) - package_url = datastore.save_data( - obj.package.sha, TransformableObject(obj.package.blob) - ) + package_url, package_sha = obj.flow_datastore.save_data( + [obj.package.blob], len_hint=1 + )[0] obj.echo( - "*Uploaded package to:* {package_url}".format(package_url=package_url), + f"*Uploaded package to:* {package_url}", fg="cyan", ) @@ -414,7 +400,7 @@ def make_flow( obj.package, package_url, obj.metadata, - obj.datastore, + obj.flow_datastore, obj.environment, obj.event_logger, obj.monitor, diff --git a/metaflow/plugins/kfp/kfp_decorator.py b/metaflow/plugins/kfp/kfp_decorator.py index dda263d9258..58d18ce840c 100644 --- a/metaflow/plugins/kfp/kfp_decorator.py +++ b/metaflow/plugins/kfp/kfp_decorator.py @@ -5,15 +5,13 @@ from urllib.parse import urlparse from metaflow import current, util -from metaflow.datastore import MetaflowDataStore -from metaflow.datastore.util.s3util import get_s3_client +from metaflow.datastore import TaskDataStore from metaflow.decorators import StepDecorator from metaflow.exception import MetaflowException from metaflow.metadata import MetaDatum +from metaflow.metadata.util import sync_local_metadata_to_datastore from metaflow.metaflow_config import DATASTORE_LOCAL_DIR -from metaflow.plugins.kfp.kfp_constants import ( - PRECEDING_COMPONENT_INPUTS_PATH, -) +from metaflow.plugins.kfp.kfp_constants import PRECEDING_COMPONENT_INPUTS_PATH from metaflow.plugins.kfp.kfp_foreach_splits import KfpForEachSplits from metaflow.sidecar import SidecarSubProcess @@ -76,10 +74,12 @@ def myStep(self): def __init__(self, attributes=None, statically_defined=False): super(KfpInternalDecorator, self).__init__(attributes, statically_defined) - def step_init(self, flow, graph, step, decos, environment, datastore, logger): + def step_init( + self, flow, graph, step_name, decorators, environment, flow_datastore, logger + ): if self.attributes["preceding_component"] is not None: - node = graph[step] - if step == "start": + node = graph[step_name] + if step_name == "start": raise KfpException( "A @kfp preceding_component cannot be on the start step." ) @@ -89,7 +89,7 @@ def step_init(self, flow, graph, step, decos, environment, datastore, logger): "The incoming step of a @kfp with a preceding_component must be linear." ) - self.datastore = datastore + self.flow_datastore = flow_datastore self.logger = logger # Add env vars from the optional @environment decorator. @@ -97,7 +97,7 @@ def step_init(self, flow, graph, step, decos, environment, datastore, logger): # ref: step function is also handling environment decorator ad-hoc # See plugins/aws/step_functions/step_functions.StepFunctions._batch env_deco = [ - deco for deco in graph[step].decorators if deco.name == "environment" + deco for deco in graph[step_name].decorators if deco.name == "environment" ] if env_deco: os.environ.update(env_deco[0].attributes["vars"].items()) @@ -105,7 +105,7 @@ def step_init(self, flow, graph, step, decos, environment, datastore, logger): def task_pre_step( self, step_name, - datastore, + task_datastore, metadata, run_id, task_id, @@ -119,19 +119,15 @@ def task_pre_step( Analogous to step_functions_decorator.py Invoked from Task.run_step within the KFP container """ + self.metadata = metadata + self.task_datastore = task_datastore + # TODO: any other KFP environment variables to get and register to Metadata service? meta = {"kfp-execution": run_id} entries = [ MetaDatum(field=k, value=v, type=k, tags=[]) for k, v in meta.items() ] - metadata.register_metadata(run_id, step_name, task_id, entries) - self._save_logs_sidecar = SidecarSubProcess("save_logs_periodically") - - if metadata.TYPE == "local": - self.ds_root = datastore.root - else: - self.ds_root = None preceding_component_outputs: List[str] = json.loads( os.environ["PRECEDING_COMPONENT_OUTPUTS"] @@ -143,6 +139,8 @@ def task_pre_step( field_value = os.environ[field] flow.__setattr__(field, field_value) + self._save_logs_sidecar = SidecarSubProcess("save_logs_periodically") + def task_finished( self, step_name, @@ -150,12 +148,17 @@ def task_finished( graph, is_task_ok, retry_count, - max_user_code_retries, + max_retries, ): """ Analogous to step_functions_decorator.py Invoked from Task.run_step within the KFP container """ + if self.metadata.TYPE == "local": + # Note that the datastore is *always* Amazon S3 (see + # runtime_task_created function). + sync_local_metadata_to_datastore(DATASTORE_LOCAL_DIR, self.task_datastore) + if not is_task_ok: # The task finished with an exception - execution won't # continue so no need to do anything here. @@ -173,34 +176,6 @@ def task_finished( } json.dump(fields_dictionary, file) - # TODO: Could we copy [context file, metadata.tgz, stdout files] in - # parallel using the S3 client shaving off a few seconds for every - # task?? These seconds add up when running lightweight Metaflow - # tests on KFP. - if self.ds_root: - # We have a local metadata service so we need to persist it to the datastore. - # Note that the datastore is *always* s3 (see runtime_task_created function) - with util.TempDir() as td: - tar_file_path = os.path.join(td, "metadata.tgz") - with tarfile.open(tar_file_path, "w:gz") as tar: - # The local metadata is stored in the local datastore - # which, for batch jobs, is always the DATASTORE_LOCAL_DIR - tar.add(DATASTORE_LOCAL_DIR) - # At this point we upload what need to s3 - s3, _ = get_s3_client() - with open(tar_file_path, "rb") as f: - path = os.path.join( - self.ds_root, - MetaflowDataStore.filename_with_attempt_prefix( - "metadata.tgz", retry_count - ), - ) - url = urlparse(path) - s3.upload_fileobj(f, url.netloc, url.path.lstrip("/")) - else: - # we are publishing to a Metadata service - pass - if graph[step_name].type == "foreach": # Save context to S3 for downstream DAG steps to access this # step's foreach_splits @@ -208,7 +183,7 @@ def task_finished( graph, step_name, current.run_id, - self.datastore, + self.flow_datastore, self.logger, ) as split_contexts: foreach_splits: Dict = split_contexts.build_foreach_splits(flow) diff --git a/metaflow/plugins/kfp/kfp_exit_handler.py b/metaflow/plugins/kfp/kfp_exit_handler.py index 06cf720ce6e..125d9a2de75 100644 --- a/metaflow/plugins/kfp/kfp_exit_handler.py +++ b/metaflow/plugins/kfp/kfp_exit_handler.py @@ -1,8 +1,7 @@ -import click - - from typing import Dict +from metaflow._vendor import click + @click.command() @click.option("--flow_name") @@ -36,10 +35,10 @@ def get_env(name, default=None) -> str: return notify_variables.get(name, os.environ.get(name, default=default)) def email_notify(send_to): - import smtplib import posixpath - from email.mime.text import MIMEText + import smtplib from email.mime.multipart import MIMEMultipart + from email.mime.text import MIMEText from email.utils import formatdate smtp_host = get_env("METAFLOW_NOTIFY_EMAIL_SMTP_HOST") diff --git a/metaflow/plugins/kfp/kfp_foreach_splits.py b/metaflow/plugins/kfp/kfp_foreach_splits.py index c769cbecde3..c82fbda118a 100644 --- a/metaflow/plugins/kfp/kfp_foreach_splits.py +++ b/metaflow/plugins/kfp/kfp_foreach_splits.py @@ -3,7 +3,7 @@ from typing import Callable, Dict, List from metaflow import S3, FlowSpec, current -from metaflow.datastore import MetaflowDataStore +from metaflow.datastore import FlowDataStore, S3Storage from metaflow.graph import DAGNode, FlowGraph from metaflow.plugins.kfp.kfp_constants import ( KFP_METAFLOW_FOREACH_SPLITS_PATH, @@ -55,7 +55,7 @@ def __init__( graph: FlowGraph, step_name: str, run_id: str, - datastore: MetaflowDataStore, + flow_datastore: FlowDataStore, logger: Callable, ): self.graph = graph @@ -63,7 +63,7 @@ def __init__( self.run_id = run_id self.logger = logger self.node = graph[step_name] - self.flow_root = datastore.make_path(graph.name, run_id) + self.flow_datastore = flow_datastore self.step_to_task_id: Dict[str, str] = graph_to_task_ids(graph) self.s3 = S3() @@ -123,10 +123,11 @@ def get_foreach_splits( parent_context_step_name, current_node, passed_in_split_indexes ) - foreach_splits_path = self._build_foreach_splits_path( + foreach_splits_path = self._build_foreach_splits_prefix( parent_context_step_name, context_node_task_id ) - input_context = json.loads(self.s3.get(foreach_splits_path).text) + s3_datastore: S3Storage = self.flow_datastore._storage_impl + input_context = json.loads(s3_datastore.load_bytes([foreach_splits_path])[0]) return input_context["foreach_splits"] @@ -182,19 +183,32 @@ def save_foreach_splits_to_local_fs(foreach_splits: Dict): json.dump(foreach_splits, file) def upload_foreach_splits_to_flow_root(self, foreach_splits: Dict): - foreach_splits_path = self._build_foreach_splits_path( - self.step_name, current.task_id + # Only S3_datastore is supported for KFP plug-in. + # Safely assume _storage_impl is of type S3Storage here + s3_datastore: S3Storage = self.flow_datastore._storage_impl + s3_datastore.save_bytes( + path_and_bytes_iter=[ + ( + self._build_foreach_splits_prefix(self.step_name, current.task_id), + json.dumps(foreach_splits), + ) + ], + overwrite=True, + len_hint=1, ) - self.s3.put(foreach_splits_path, json.dumps(foreach_splits)) @staticmethod def get_step_task_id(task_id: str, passed_in_split_indexes: str) -> str: return f"{task_id}.{passed_in_split_indexes}".strip(".") - def _build_foreach_splits_path(self, step_name: str, task_id: str) -> str: - # returns: s3:///foreach_splits/{task_id}.{step_name}.json - s3_path = os.path.join( - os.path.join(self.flow_root, "foreach_splits"), + def _build_foreach_splits_prefix(self, step_name: str, task_id: str) -> str: + """For foreach splits generate file prefix used for datastore""" + # Save to `///foreach_splits/{task_id}.{step_name}.json` + # S3Storage.datastore_root: `s3://` + # Key: `//foreach_splits/{task_id}.{step_name}.json` + return os.path.join( + self.flow_datastore.flow_name, + current.run_id, + "foreach_splits", f"{task_id}.{step_name}.json", ) - return s3_path diff --git a/metaflow/plugins/kfp/kfp_get_workflow_uid.py b/metaflow/plugins/kfp/kfp_get_workflow_uid.py index 0da00d84588..780da279a72 100644 --- a/metaflow/plugins/kfp/kfp_get_workflow_uid.py +++ b/metaflow/plugins/kfp/kfp_get_workflow_uid.py @@ -1,12 +1,13 @@ -import click - -import pathlib import os +import pathlib + from kubernetes import config from kubernetes.client import api_client from kubernetes.dynamic import DynamicClient from kubernetes.dynamic.resource import Resource, ResourceInstance +from metaflow._vendor import click + @click.command() @click.option("--workflow_name") diff --git a/metaflow/plugins/kfp/kfp_metaflow_step.py b/metaflow/plugins/kfp/kfp_metaflow_step.py index 09a486a258b..8d78efddf73 100644 --- a/metaflow/plugins/kfp/kfp_metaflow_step.py +++ b/metaflow/plugins/kfp/kfp_metaflow_step.py @@ -1,27 +1,25 @@ -import pathlib -from typing import List - -import os import json import logging +import os +import pathlib from subprocess import Popen from typing import Dict, List -import click - -from metaflow.mflog import bash_capture_logs, export_mflog_env_vars, BASH_SAVE_LOGS +from metaflow._vendor import click +from metaflow.mflog import BASH_SAVE_LOGS, bash_capture_logs, export_mflog_env_vars from metaflow.plugins.kfp.kfp_constants import ( - STEP_ENVIRONMENT_VARIABLES, - LOGS_DIR, - STDOUT_PATH, - STDERR_PATH, - TASK_ID_ENV_NAME, - SPLIT_INDEX_ENV_NAME, INPUT_PATHS_ENV_NAME, - RETRY_COUNT, KFP_METAFLOW_FOREACH_SPLITS_PATH, + LOGS_DIR, PRECEDING_COMPONENT_INPUTS_PATH, + RETRY_COUNT, + SPLIT_INDEX_ENV_NAME, + STDERR_PATH, + STDOUT_PATH, + STEP_ENVIRONMENT_VARIABLES, + TASK_ID_ENV_NAME, ) + from ... import R diff --git a/metaflow/plugins/kfp/kfp_s3_sensor.py b/metaflow/plugins/kfp/kfp_s3_sensor.py index 57538f4dc53..1d6460db0b4 100644 --- a/metaflow/plugins/kfp/kfp_s3_sensor.py +++ b/metaflow/plugins/kfp/kfp_s3_sensor.py @@ -4,22 +4,19 @@ (2) It splits the formatted path into an S3 bucket and key (3) It polls for an object with the specified bucket and key until timeout """ -from email.policy import default -import click - -import os -import pathlib -from typing import Dict -import botocore import base64 import json import marshal +import os +import pathlib import time -from urllib.parse import urlparse, ParseResult +from typing import Dict, Tuple +from urllib.parse import ParseResult, urlparse -from typing import Tuple +import botocore -from metaflow.datastore.util.s3util import get_s3_client +from metaflow._vendor import click +from metaflow.datatools.s3util import get_s3_client def construct_elapsed_time_s3_bucket_and_key( diff --git a/metaflow/plugins/kfp/kfp_step_init.py b/metaflow/plugins/kfp/kfp_step_init.py index 3c146a5b921..9809a7efb08 100644 --- a/metaflow/plugins/kfp/kfp_step_init.py +++ b/metaflow/plugins/kfp/kfp_step_init.py @@ -1,5 +1,5 @@ import os -from collections import Callable +from typing import Callable from metaflow.graph import DAGNode, FlowGraph from metaflow.plugins.kfp.kfp_constants import ( @@ -13,7 +13,7 @@ def save_step_environment_variables( - datastore, + flow_datastore, graph: FlowGraph, run_id: str, step_name: str, @@ -31,7 +31,7 @@ def save_step_environment_variables( by Metaflow step command line arguments. """ with KfpForEachSplits( - graph, step_name, run_id, datastore, logger + graph, step_name, run_id, flow_datastore, logger ) as split_contexts: environment_exports = { # The step task_id diff --git a/metaflow/plugins/kfp/kfp_utils.py b/metaflow/plugins/kfp/kfp_utils.py index 3870782f734..cddc282ae1e 100644 --- a/metaflow/plugins/kfp/kfp_utils.py +++ b/metaflow/plugins/kfp/kfp_utils.py @@ -8,9 +8,8 @@ import datetime import json import logging -import os - import math +import os import posixpath import sys import time @@ -21,6 +20,7 @@ from metaflow.util import get_username try: # Extra required dependency specific to kfp plug-in may not exists + import kfp_server_api from kfp import Client as KFPClient from kfp_server_api import ( ApiExperiment, @@ -29,7 +29,6 @@ ApiRun, RunServiceApi, ) - import kfp_server_api except ImportError: # Silence import errors in type hint KFPClient = None ApiExperiment = None diff --git a/metaflow/plugins/kfp/nested_parallelfor.ipynb b/metaflow/plugins/kfp/nested_parallelfor.ipynb index 8f768fade9c..81228305309 100644 --- a/metaflow/plugins/kfp/nested_parallelfor.ipynb +++ b/metaflow/plugins/kfp/nested_parallelfor.ipynb @@ -18,14 +18,19 @@ "import kfp\n", "from kfp import compiler, dsl\n", "from kfp.dsl import ContainerOp, python_component, RUN_ID_PLACEHOLDER\n", - "from kfp.components import func_to_container_op, func_to_component_text, load_component_from_text\n", + "from kfp.components import (\n", + " func_to_container_op,\n", + " func_to_component_text,\n", + " load_component_from_text,\n", + ")\n", "\n", "\n", "def pipeline_transformer(op: ContainerOp):\n", " op.execution_options.caching_strategy.max_cache_staleness = \"P0D\"\n", " op.container.set_cpu_request(\"150m\")\n", - " op.container.set_cpu_limit(\"150m\") # please ensure the limit and requests are equal\n", - " \n", + " op.container.set_cpu_limit(\"150m\") # please ensure the limit and requests are equal\n", + "\n", + "\n", "def named_op(name: str, func: callable) -> ContainerOp:\n", " text = yaml.load(func_to_component_text(func), yaml.SafeLoader)\n", " text[\"name\"] = name\n", @@ -42,14 +47,16 @@ " print(\"passed_in_split_indexes\", passed_in_split_indexes)\n", " return [f\"{passed_in_split_indexes}_{i}\".strip(\"_\") for i in range(1, 3)]\n", "\n", + "\n", "@func_to_container_op\n", "def inner_step(passed_in_split_indexes) -> list:\n", " print(passed_in_split_indexes)\n", " return None\n", "\n", + "\n", "def join_results(passed_in_split_indexes=\"\") -> list:\n", " print(\"passed_in_split_indexes\", passed_in_split_indexes)\n", - " return None " + " return None" ], "outputs": [], "metadata": {} @@ -81,29 +88,30 @@ "execution_count": 13, "source": [ "@kfp.dsl.pipeline(name=\"metaflow_foreach\", description=\"test\")\n", - "def map_reduce_pipeline(): \n", + "def map_reduce_pipeline():\n", " op1_foreach = named_op(\"foreach1\", foreach_step)(\"\")\n", - " with kfp.dsl.ParallelFor(op1_foreach.output) as split_1: \n", + " with kfp.dsl.ParallelFor(op1_foreach.output) as split_1:\n", " # ** The split_1 becomes inner step Metaflow task_ids\n", - " \n", + "\n", " # foreach\n", " op2_foreach = named_op(\"foreach2\", foreach_step)(split_1)\n", " with kfp.dsl.ParallelFor(op2_foreach.output) as split_2:\n", " op3_inner = inner_step(split_2)\n", " # ** Join on op2_foreach with split_1\n", " op_join2 = named_op(\"op2_foreach join\", join_results)(split_1).after(op3_inner)\n", - " \n", + "\n", " # ** Join op1_foreach, but we are not inside a foreach, so no split\n", " named_op(\"op1_foreach joinop1_foreach join\", join_results)().after(op_join2)\n", - " \n", + "\n", " dsl.get_pipeline_conf().add_op_transformer(pipeline_transformer)\n", "\n", + "\n", "kfp.Client(userid=\"talebz@zillowgroup.com\").create_run_from_pipeline_func(\n", " map_reduce_pipeline,\n", " arguments={},\n", " run_name=\"nested_join\",\n", " experiment_name=\"nested_foreaches\",\n", - " namespace=\"aip-example\"\n", + " namespace=\"aip-example\",\n", ")" ], "outputs": [ @@ -173,38 +181,41 @@ "source": [ "# nested loops\n", "\n", + "\n", "@kfp.dsl.pipeline(name=\"test\", description=\"test\")\n", - "def map_reduce_pipeline(): \n", + "def map_reduce_pipeline():\n", " op1_foreach = named_op(\"foreach1\", foreach_step)(\"\")\n", - " with kfp.dsl.ParallelFor(op1_foreach.output) as split_1: \n", + " with kfp.dsl.ParallelFor(op1_foreach.output) as split_1:\n", " # ** The split_1 becomes inner step Metaflow task_ids\n", - " \n", + "\n", " # foreach\n", " op2_foreach = named_op(\"foreach2\", foreach_step)(split_1)\n", " with kfp.dsl.ParallelFor(op2_foreach.output) as split_2:\n", " # foreach\n", " op3_foreach = named_op(\"foreach3\", foreach_step)(split_2)\n", " with kfp.dsl.ParallelFor(op3_foreach.output) as split_3:\n", - " op3_inner = inner_step(split_3) \n", - " \n", + " op3_inner = inner_step(split_3)\n", + "\n", " # ** Join on op2_foreach with split_1\n", - " op_join3 = named_op(\"op3_foreach join\", join_results)(split_2).after(op3_inner)\n", - " \n", - " \n", - " # ** Join on op2_foreach with split_1 \n", + " op_join3 = named_op(\"op3_foreach join\", join_results)(split_2).after(\n", + " op3_inner\n", + " )\n", + "\n", + " # ** Join on op2_foreach with split_1\n", " op_join2 = named_op(\"op2_foreach join\", join_results)(split_1).after(op_join3)\n", - " \n", + "\n", " # ** Join op1_foreach, but we are not inside a foreach, so no split\n", " named_op(\"op1_foreach join\", join_results)().after(op_join2)\n", - " \n", + "\n", " dsl.get_pipeline_conf().add_op_transformer(pipeline_transformer)\n", "\n", + "\n", "kfp.Client(userid=\"talebz@zillowgroup.com\").create_run_from_pipeline_func(\n", " map_reduce_pipeline,\n", " arguments={},\n", " run_name=\"map_reduce_pipeline\",\n", " experiment_name=\"kf-1.0-test-experiment\",\n", - " namespace=\"aip-example\"\n", + " namespace=\"aip-example\",\n", ")" ], "outputs": [ diff --git a/metaflow/plugins/kfp/s3_sensor_decorator.py b/metaflow/plugins/kfp/s3_sensor_decorator.py index 835fafd5f5d..66bcf9b33a9 100644 --- a/metaflow/plugins/kfp/s3_sensor_decorator.py +++ b/metaflow/plugins/kfp/s3_sensor_decorator.py @@ -1,10 +1,10 @@ -from metaflow.decorators import FlowDecorator -from metaflow.exception import MetaflowException - from types import FunctionType from typing import Tuple from urllib.parse import urlparse +from metaflow.decorators import FlowDecorator +from metaflow.exception import MetaflowException + """ Within identity_formatter, which is passed in as the path_formatter parameter, customers have access to all variables in flow_parameters_json (which @@ -78,7 +78,9 @@ class S3SensorDecorator(FlowDecorator): "os_expandvars": False, } - def flow_init(self, flow, graph, environment, datastore, logger, echo, options): + def flow_init( + self, flow, graph, environment, flow_datastore, logger, echo, options + ): self.path = self.attributes["path"] self.timeout_seconds = self.attributes["timeout_seconds"] self.polling_interval_seconds = self.attributes["polling_interval_seconds"] diff --git a/metaflow/plugins/kfp/set_batch_environment.py b/metaflow/plugins/kfp/set_batch_environment.py index e1ebfb34255..fcd6e87ad66 100644 --- a/metaflow/plugins/kfp/set_batch_environment.py +++ b/metaflow/plugins/kfp/set_batch_environment.py @@ -1,7 +1,7 @@ import json import os -import click +from metaflow._vendor import click @click.group() diff --git a/metaflow/plugins/kfp/tests/flows/check_error_handling_flow.py b/metaflow/plugins/kfp/tests/flows/check_error_handling_flow.py index f0315827ad4..b19f8e488de 100644 --- a/metaflow/plugins/kfp/tests/flows/check_error_handling_flow.py +++ b/metaflow/plugins/kfp/tests/flows/check_error_handling_flow.py @@ -1,4 +1,4 @@ -from metaflow import Parameter, FlowSpec, step, Step +from metaflow import FlowSpec, Parameter, Step, step class CheckErrorHandlingFlow(FlowSpec): diff --git a/metaflow/plugins/kfp/tests/flows/failure_flow.py b/metaflow/plugins/kfp/tests/flows/failure_flow.py index 16138e9dd7d..6b009f25fa5 100644 --- a/metaflow/plugins/kfp/tests/flows/failure_flow.py +++ b/metaflow/plugins/kfp/tests/flows/failure_flow.py @@ -3,16 +3,7 @@ import subprocess import time -from metaflow import ( - FlowSpec, - step, - retry, - catch, - timeout, - current, - Step, - Parameter, -) +from metaflow import FlowSpec, Parameter, Step, catch, current, retry, step, timeout from metaflow.exception import MetaflowExceptionWrapper diff --git a/metaflow/plugins/kfp/tests/flows/flow_triggering_flow.py b/metaflow/plugins/kfp/tests/flows/flow_triggering_flow.py index bf0753d8c9f..5e5c0b122f9 100644 --- a/metaflow/plugins/kfp/tests/flows/flow_triggering_flow.py +++ b/metaflow/plugins/kfp/tests/flows/flow_triggering_flow.py @@ -1,3 +1,5 @@ +import datetime + from metaflow import FlowSpec, Parameter, Step, current, step from metaflow.metaflow_config import KFP_SDK_NAMESPACE from metaflow.plugins.kfp import ( @@ -11,7 +13,6 @@ _upload_pipeline, to_metaflow_run_id, ) -import datetime TEST_PIPELINE_NAME = "metaflow-unit-test-flow-triggering-flow" logger.handlers.clear() # Avoid double printint logs TODO (yunw) address logging story diff --git a/metaflow/plugins/kfp/tests/flows/foreach_linear_foreach.py b/metaflow/plugins/kfp/tests/flows/foreach_linear_foreach.py index fd0ff1523b1..f08d8339601 100644 --- a/metaflow/plugins/kfp/tests/flows/foreach_linear_foreach.py +++ b/metaflow/plugins/kfp/tests/flows/foreach_linear_foreach.py @@ -1,4 +1,4 @@ -from metaflow import FlowSpec, step, current, Step +from metaflow import FlowSpec, Step, current, step class ForeachLinearForeach(FlowSpec): diff --git a/metaflow/plugins/kfp/tests/flows/kfp_flow.py b/metaflow/plugins/kfp/tests/flows/kfp_flow.py index f5441a80e21..a911f96d84a 100644 --- a/metaflow/plugins/kfp/tests/flows/kfp_flow.py +++ b/metaflow/plugins/kfp/tests/flows/kfp_flow.py @@ -1,9 +1,10 @@ -from typing import NamedTuple import os +from typing import NamedTuple -from metaflow import FlowSpec, step, kfp, resources -from kubernetes import client, config from kfp.components import func_to_container_op +from kubernetes import client, config + +from metaflow import FlowSpec, kfp, resources, step def div_mod( diff --git a/metaflow/plugins/kfp/tests/flows/merge_artifacts.py b/metaflow/plugins/kfp/tests/flows/merge_artifacts.py index 3c961ac9d04..5b33a353a3a 100644 --- a/metaflow/plugins/kfp/tests/flows/merge_artifacts.py +++ b/metaflow/plugins/kfp/tests/flows/merge_artifacts.py @@ -1,7 +1,7 @@ -from metaflow import FlowSpec, step - import pytest +from metaflow import FlowSpec, step + class MergeArtifacts(FlowSpec): """ diff --git a/metaflow/plugins/kfp/tests/flows/metadata_flow.py b/metaflow/plugins/kfp/tests/flows/metadata_flow.py index e3743fa29bc..8234607c9d8 100644 --- a/metaflow/plugins/kfp/tests/flows/metadata_flow.py +++ b/metaflow/plugins/kfp/tests/flows/metadata_flow.py @@ -1,8 +1,8 @@ -from metaflow import FlowSpec, step, current, Step - import os from random import random +from metaflow import FlowSpec, Step, current, step + class MetadataFlow(FlowSpec): start_message = "MetadataFlow is starting." diff --git a/metaflow/plugins/kfp/tests/flows/resources_flow.py b/metaflow/plugins/kfp/tests/flows/resources_flow.py index 5b00b551c94..17549565e5f 100644 --- a/metaflow/plugins/kfp/tests/flows/resources_flow.py +++ b/metaflow/plugins/kfp/tests/flows/resources_flow.py @@ -1,5 +1,4 @@ import os -import click import pprint import subprocess import time @@ -12,14 +11,8 @@ V1ResourceFieldSelector, ) -from metaflow import ( - FlowSpec, - step, - environment, - resources, - current, - Parameter, -) +from metaflow import FlowSpec, Parameter, current, environment, resources, step +from metaflow._vendor import click def get_env_vars(env_resources: Dict[str, str]) -> List[V1EnvVar]: diff --git a/metaflow/plugins/kfp/tests/flows/s3_sensor_flow.py b/metaflow/plugins/kfp/tests/flows/s3_sensor_flow.py index bc6144818a9..35accf2f072 100644 --- a/metaflow/plugins/kfp/tests/flows/s3_sensor_flow.py +++ b/metaflow/plugins/kfp/tests/flows/s3_sensor_flow.py @@ -1,7 +1,7 @@ -from metaflow import FlowSpec, step, resources, s3_sensor, Parameter - from os.path import join +from metaflow import FlowSpec, Parameter, resources, s3_sensor, step + """ This test flow ensures that @s3_sensor properly waits for path to be written to in S3. In particular, this test ensures environment variables are correctly substituted diff --git a/metaflow/plugins/kfp/tests/flows/s3_sensor_with_formatter_flow.py b/metaflow/plugins/kfp/tests/flows/s3_sensor_with_formatter_flow.py index 473f2c4cdd9..15507f866fc 100644 --- a/metaflow/plugins/kfp/tests/flows/s3_sensor_with_formatter_flow.py +++ b/metaflow/plugins/kfp/tests/flows/s3_sensor_with_formatter_flow.py @@ -1,8 +1,8 @@ -from metaflow import FlowSpec, step, resources, s3_sensor, Parameter - from os.path import join from typing import Dict +from metaflow import FlowSpec, Parameter, resources, s3_sensor, step + """ This test flow ensures that @s3_sensor properly waits for path to be written to in S3. In particular, this test ensures `path_formatter` works and users are diff --git a/metaflow/plugins/kfp/tests/flows/toleration_and_affinity_flow.py b/metaflow/plugins/kfp/tests/flows/toleration_and_affinity_flow.py index 0d855591d06..61b64ff0e35 100644 --- a/metaflow/plugins/kfp/tests/flows/toleration_and_affinity_flow.py +++ b/metaflow/plugins/kfp/tests/flows/toleration_and_affinity_flow.py @@ -1,4 +1,4 @@ -from metaflow import FlowSpec, step, resources, accelerator +from metaflow import FlowSpec, accelerator, resources, step class TolerationAndAffinityFlow(FlowSpec): diff --git a/metaflow/plugins/kfp/tests/flows/validate_s3_sensor_flows.py b/metaflow/plugins/kfp/tests/flows/validate_s3_sensor_flows.py index eb4641b0f98..2b3ad409e9e 100644 --- a/metaflow/plugins/kfp/tests/flows/validate_s3_sensor_flows.py +++ b/metaflow/plugins/kfp/tests/flows/validate_s3_sensor_flows.py @@ -1,20 +1,17 @@ -from metaflow import FlowSpec, step, resources, s3_sensor, Parameter - -import botocore import time -from subprocess import run, PIPE - from os import environ from os.path import join +from subprocess import PIPE, run +from urllib.parse import ParseResult, urlparse -from urllib.parse import urlparse, ParseResult - +import botocore from kubernetes import config from kubernetes.client import api_client from kubernetes.dynamic import DynamicClient from kubernetes.dynamic.resource import Resource, ResourceInstance -from metaflow.datastore.util.s3util import get_s3_client +from metaflow import FlowSpec, Parameter, resources, s3_sensor, step +from metaflow.datatools.s3util import get_s3_client """ This test flow validates the execution of s3_sensor_flow.py and diff --git a/metaflow/plugins/kfp/tests/run_integration_tests.py b/metaflow/plugins/kfp/tests/run_integration_tests.py index f03aadcde20..bf92b2dbd95 100644 --- a/metaflow/plugins/kfp/tests/run_integration_tests.py +++ b/metaflow/plugins/kfp/tests/run_integration_tests.py @@ -1,19 +1,18 @@ +import json +import re import tempfile +import time +import uuid from os import listdir from os.path import isfile, join +from subprocess import CompletedProcess +from typing import Dict, List, Tuple -import json +import pytest import requests -from requests.models import Response import yaml -from subprocess import CompletedProcess +from requests.models import Response from subprocess_tee import run -import re -from typing import List, Dict, Tuple - -import pytest -import time -import uuid from metaflow import R from metaflow.exception import MetaflowException diff --git a/metaflow/plugins/kfp/tests/test_kfp_metaflow_step.py b/metaflow/plugins/kfp/tests/test_kfp_metaflow_step.py index 02867fdb857..081b23e7f59 100644 --- a/metaflow/plugins/kfp/tests/test_kfp_metaflow_step.py +++ b/metaflow/plugins/kfp/tests/test_kfp_metaflow_step.py @@ -1,10 +1,10 @@ -import pytest -from unittest.mock import patch, Mock - from typing import List +from unittest.mock import Mock, patch + +import pytest -from metaflow.plugins.kfp.kfp_metaflow_step import _step_cli, _command -from metaflow.plugins.kfp.kfp_constants import STDOUT_PATH, STDERR_PATH +from metaflow.plugins.kfp.kfp_constants import STDERR_PATH, STDOUT_PATH +from metaflow.plugins.kfp.kfp_metaflow_step import _command, _step_cli """ To run these tests from your terminal, go to the root directory and run: diff --git a/metaflow/plugins/kfp/tests/test_kfp_s3_sensor.py b/metaflow/plugins/kfp/tests/test_kfp_s3_sensor.py index e4151926e86..84c3b210f03 100644 --- a/metaflow/plugins/kfp/tests/test_kfp_s3_sensor.py +++ b/metaflow/plugins/kfp/tests/test_kfp_s3_sensor.py @@ -1,19 +1,16 @@ +import base64 +import marshal +import os +import tempfile from unittest import mock -from metaflow.plugins.kfp.kfp_s3_sensor import wait_for_s3_path - -from unittest.mock import call, Mock, patch, PropertyMock -import pytest +from unittest.mock import Mock, PropertyMock, call, patch import boto3 +import pytest from botocore.exceptions import ClientError from moto import mock_s3 -import tempfile - -import base64 -import marshal - -import os +from metaflow.plugins.kfp.kfp_s3_sensor import wait_for_s3_path """ To run these tests from your terminal, go to the root directory and run: diff --git a/metaflow/plugins/resources_decorator.py b/metaflow/plugins/resources_decorator.py index d9b34bc8cba..7cabcbb0916 100644 --- a/metaflow/plugins/resources_decorator.py +++ b/metaflow/plugins/resources_decorator.py @@ -4,19 +4,12 @@ class ResourcesDecorator(StepDecorator): """ Step decorator to specify the resources needed when executing this step. -<<<<<<< HEAD - This decorator passes this information along when requesting resources - to execute this step. - This decorator can be used for two purpose: - When using with AWS Batch decorator, only 'cpu', 'gpu', and 'memory' - parameters are supported. - When using for Kubeflow Pipeline, 'cpu', and 'memory' sets resource requests; - 'cpu_limit', 'gpu', 'memory_limit' sets resource limit. - For more details please refer to - https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/ - This decorator is ignored if the execution of the step does not happen on - Batch or Kubeflow Pipeline + This decorator passes this information along to container orchestrator + (AWS Batch, Kubernetes, etc.) when requesting resources to execute this + step. + + This decorator is ignored if the execution of the step happens locally. To use, annotate your step as follows: ``` @@ -39,17 +32,13 @@ def myStep(self): Parameters ---------- cpu : Union[int, float, str] - AWS Batch: Number of CPUs required for this step. Defaults to 1. + AWS Batch: Number of CPUs required for this step. Defaults to 1 in batch decorator. Must be integer. - KFP: Number of CPUs required for this step. Defaults to 1. + KFP: Number of CPUs required for this step. Defaults to None - use cluster setting. Accept int, float, or str. Support millicpu requests using float or string ending in 'm'. Requests with decimal points, like 0.1, are converted to 100m by kfp Precision finer than 1m is not allowed. - cpu_limit : Union[int, float, str] - Not for AWS Batch. - KFP: Number of CPUs limited for this step. - Defaults None - relying on Kubernetes defaults. gpu : int AWS Batch: Number of GPUs required for this step. Defaults to 0. KFP: GPU limit for this step. Defaults to 0. @@ -59,13 +48,9 @@ def myStep(self): Not for AWS Batch. KFP: "nvidia" or "amd". Defaults to "nvidia". memory : Union[int, str] - AWS Batch: Memory size (in MB) required for this step. Defaults to 4000. - KFP: Memory required for this step. Default to 4000 MB. + AWS Batch: Memory size (in MB) required for this step. Defaults to 4096 in batch decortor. + KFP: Memory required for this step. Default to None - use cluster setting. See notes above for more units. - memory_limit : Union[int, str] - Not for AWS Batch. - KFP: Memory limit for this step. Default unit is MB - see notes above. - Defaults None - relying on Kubernetes defaults. shared_memory : int Not for KFP AWS Batch: The value for the size (in MiB) of the /dev/shm volume for this step. @@ -98,7 +83,8 @@ def myStep(self): volume_dir: str Default "/opt/metaflow_volume" """ - name = 'resources' + + name = "resources" # Actual defaults are set in .aws.batch.batch_decorator.BatchDecorator and # .kfp.kfp.KubeflowPipelines._get_resource_requirements respectively. @@ -108,45 +94,12 @@ def myStep(self): "cpu": None, "gpu": None, "memory": None, - # Only AWS Batch supported attributes - 'shared_memory': None, - + "shared_memory": None, # Only KFP supported attributes "gpu_vendor": None, "local_storage": None, "volume": None, "volume_mode": "ReadWriteOnce", - "volume_dir": "/opt/metaflow_volume" + "volume_dir": "/opt/metaflow_volume", } -======= - - This decorator passes this information along to container orchestrator - (AWS Batch, Kubernetes, etc.) when requesting resources to execute this - step. - - This decorator is ignored if the execution of the step happens locally. - - To use, annotate your step as follows: - ``` - @resources(cpu=32) - @step - def my_step(self): - ... - ``` - Parameters - ---------- - cpu : int - Number of CPUs required for this step. Defaults to 1 - gpu : int - Number of GPUs required for this step. Defaults to 0 - memory : int - Memory size (in MB) required for this step. Defaults to 4096 - shared_memory : int - The value for the size (in MiB) of the /dev/shm volume for this step. - This parameter maps to the --shm-size option to docker run . - """ - - name = "resources" - defaults = {"cpu": "1", "gpu": "0", "memory": "4096", "shared_memory": None} ->>>>>>> master diff --git a/metaflow/task.py b/metaflow/task.py index 9990dd05e9d..3e8fd43b6d4 100644 --- a/metaflow/task.py +++ b/metaflow/task.py @@ -415,18 +415,6 @@ def run_step( self._init_foreach(step_name, join_type, inputs, split_index) # 4. initialize the current singleton -<<<<<<< HEAD - current._set_env(flow=self.flow, - flow_name=self.flow.name, - run_id=run_id, - step_name=step_name, - task_id=task_id, - retry_count=retry_count, - origin_run_id=origin_run_id, - namespace=resolve_identity(), - username=get_username(), - is_running=True) -======= current._set_env( flow=self.flow, run_id=run_id, @@ -438,7 +426,6 @@ def run_step( username=get_username(), is_running=True, ) ->>>>>>> master # 5. run task output.save_metadata( diff --git a/metaflow/tutorials/09-hellokfp/kfp_divmod.py b/metaflow/tutorials/09-hellokfp/kfp_divmod.py index 964a9f49719..f77d0b4d6ab 100644 --- a/metaflow/tutorials/09-hellokfp/kfp_divmod.py +++ b/metaflow/tutorials/09-hellokfp/kfp_divmod.py @@ -6,7 +6,9 @@ @func_to_container_op -def div_mod(dividend: int, divisor: int) -> NamedTuple("result", [('quotient', int), ('remainder', int)]): +def div_mod( + dividend: int, divisor: int +) -> NamedTuple("result", [("quotient", int), ("remainder", int)]): print(f"dividend={dividend}, divisor={divisor}") return divmod(dividend, divisor) diff --git a/metaflow/tutorials/09-hellokfp/kfp_graph_component_flow.py b/metaflow/tutorials/09-hellokfp/kfp_graph_component_flow.py index 132a5106723..bb1273e52f3 100644 --- a/metaflow/tutorials/09-hellokfp/kfp_graph_component_flow.py +++ b/metaflow/tutorials/09-hellokfp/kfp_graph_component_flow.py @@ -54,7 +54,10 @@ def start(self): self.next(self.end) - @kfp(preceding_component=my_recursive_component, preceding_component_inputs=["s3_root"]) + @kfp( + preceding_component=my_recursive_component, + preceding_component_inputs=["s3_root"], + ) @step def end(self): """ diff --git a/metaflow/tutorials/09-hellokfp/kfp_hellopipeler.py b/metaflow/tutorials/09-hellokfp/kfp_hellopipeler.py index 31a43e435f5..921e09685b9 100644 --- a/metaflow/tutorials/09-hellokfp/kfp_hellopipeler.py +++ b/metaflow/tutorials/09-hellokfp/kfp_hellopipeler.py @@ -4,17 +4,18 @@ def pipeler_op(run_id): - return dsl.ContainerOp( - name='pipelerstep1', - image='analytics-docker.artifactory.zgtools.net/artificial-intelligence/ai-platform/aip-py36-cpu-spark-jupyter:2.3.8254d0ef.spark-2-4', - command=['sh', '-c'], - arguments=[ - "spark-submit --master local --deploy-mode client --conf \"spark.eventLog.enabled=false\" " - "--class com.zillow.pipeler.orchestrator.example.ExampleTemplatizedConfigStep " - "https://artifactory.zgtools.net/artifactory/analytics-maven-local/com/zillow/pipeler/datalake-pipeler/2.3.0/datalake-pipeler-2.3.0.jar " - f"--sessionId {run_id}_execution " - ] - ) + return dsl.ContainerOp( + name="pipelerstep1", + image="analytics-docker.artifactory.zgtools.net/artificial-intelligence/ai-platform/aip-py36-cpu-spark-jupyter:2.3.8254d0ef.spark-2-4", + command=["sh", "-c"], + arguments=[ + 'spark-submit --master local --deploy-mode client --conf "spark.eventLog.enabled=false" ' + "--class com.zillow.pipeler.orchestrator.example.ExampleTemplatizedConfigStep " + "https://artifactory.zgtools.net/artifactory/analytics-maven-local/com/zillow/pipeler/datalake-pipeler/2.3.0/datalake-pipeler-2.3.0.jar " + f"--sessionId {run_id}_execution " + ], + ) + class HelloPipeler(FlowSpec): """ @@ -30,10 +31,7 @@ def start(self): self.run_id = current.run_id self.next(self.end) - @kfp( - preceding_component=pipeler_op, - preceding_component_inputs="run_id" - ) + @kfp(preceding_component=pipeler_op, preceding_component_inputs="run_id") @step def end(self): """ diff --git a/metaflow/tutorials/09-hellokfp/parameter_flow.py b/metaflow/tutorials/09-hellokfp/parameter_flow.py index 0e7d504844b..8f6e9c79390 100644 --- a/metaflow/tutorials/09-hellokfp/parameter_flow.py +++ b/metaflow/tutorials/09-hellokfp/parameter_flow.py @@ -14,24 +14,19 @@ class ParameterFlow(FlowSpec): """ alpha = Parameter( - 'alpha', - help='param with default', + "alpha", + help="param with default", default=0.01, ) - beta = Parameter( - 'beta', - help='param with no default', - type=int, - required=True - ) + beta = Parameter("beta", help="param with no default", type=int, required=True) host_name = Parameter( - 'host_name', - help='Deploy-time param evaluated at deployment', + "host_name", + help="Deploy-time param evaluated at deployment", type=str, default=get_host_name, - required=True + required=True, ) @step @@ -54,5 +49,5 @@ def end(self): print(f"Host name: {self.host_name}") -if __name__ == '__main__': +if __name__ == "__main__": ParameterFlow() diff --git a/metaflow/tutorials/09-hellokfp/resource_flow.py b/metaflow/tutorials/09-hellokfp/resource_flow.py index 2f39e6405f4..3b3ac9af6d3 100644 --- a/metaflow/tutorials/09-hellokfp/resource_flow.py +++ b/metaflow/tutorials/09-hellokfp/resource_flow.py @@ -7,6 +7,7 @@ class ResourceFlow(FlowSpec): The hello step uses @resource decorator that only works when kfp plug-in is used. """ + @step def start(self): """ @@ -18,9 +19,7 @@ def start(self): self.next(self.all_resource) @resources( - cpu=0.5, cpu_limit=5, - gpu=3, gpu_vendor="amd", - memory=150, memory_limit="1G" + cpu=0.5, cpu_limit=5, gpu=3, gpu_vendor="amd", memory=150, memory_limit="1G" ) @step def all_resource(self): @@ -67,5 +66,5 @@ def end(self): print("___________________________________________") -if __name__ == '__main__': +if __name__ == "__main__": ResourceFlow() diff --git a/metaflow/tutorials/10-pytorch/hello_pytorch.py b/metaflow/tutorials/10-pytorch/hello_pytorch.py index c301fa3ea6f..77067d8db01 100644 --- a/metaflow/tutorials/10-pytorch/hello_pytorch.py +++ b/metaflow/tutorials/10-pytorch/hello_pytorch.py @@ -33,7 +33,15 @@ def start(self): print(f"ranks: {self.ranks}") self.next(self.train, foreach="ranks") - @resources(cpu=1, cpu_limit=2, gpu="1", memory="2G", memory_limit="5G", volume="10G", volume_mode="ReadWriteMany") + @resources( + cpu=1, + cpu_limit=2, + gpu="1", + memory="2G", + memory_limit="5G", + volume="10G", + volume_mode="ReadWriteMany", + ) @step def train(self): """ @@ -43,6 +51,7 @@ def train(self): print("self.rank", self.rank) from models.train import train_model + self.model_state_dict = train_model( input_data_path=self.input_data_path, model_path=self.model_path, @@ -68,6 +77,7 @@ def evaluate(self, inputs): self.model_state_dict = train_input.model_state_dict from models.evaluate import evaluate_model + self.evaluate_results = evaluate_model( model_state_dict=self.model_state_dict, input_data_path=self.input_data_path, diff --git a/metaflow/tutorials/10-pytorch/models/evaluate.py b/metaflow/tutorials/10-pytorch/models/evaluate.py index 876052bf700..6cece7b595e 100644 --- a/metaflow/tutorials/10-pytorch/models/evaluate.py +++ b/metaflow/tutorials/10-pytorch/models/evaluate.py @@ -78,4 +78,4 @@ def evaluate_model( test_batch_size=512, test_accuracy_threshold=0.5, train_accuracy_threshold=0.5, - ) \ No newline at end of file + ) diff --git a/metaflow/util.py b/metaflow/util.py index 3d5ebc1efe1..6a42796edf2 100644 --- a/metaflow/util.py +++ b/metaflow/util.py @@ -167,14 +167,7 @@ def get_username(): could not be determined. """ # note: the order of the list matters -<<<<<<< HEAD - if METAFLOW_USER: - return METAFLOW_USER - - ENVVARS = ['METAFLOW_USER', 'SUDO_USER', 'USERNAME', 'USER'] -======= ENVVARS = ["METAFLOW_USER", "SUDO_USER", "USERNAME", "USER"] ->>>>>>> master for var in ENVVARS: user = os.environ.get(var) if user and user != "root": diff --git a/setup.py b/setup.py index e2b655c97fc..16ef8ee2e88 100644 --- a/setup.py +++ b/setup.py @@ -1,39 +1,10 @@ from setuptools import setup, find_packages -version = "2.5.1" +version = "2.5.2" -<<<<<<< HEAD -setup(name='zillow-metaflow', - version=version, - description='Metaflow: More Data Science, Less Engineering', - author='Machine Learning Infrastructure Team at Netflix', - author_email='help@metaflow.org', - license='Apache License 2.0', - packages=find_packages(exclude=['metaflow_test']), - py_modules=['metaflow', ], - package_data={'metaflow' : ['tutorials/*/*']}, - entry_points=''' - [console_scripts] - metaflow=metaflow.main_cli:main - ''', - install_requires = [ - 'click>=7.0,<8', - 'requests', - 'boto3', - 'pylint' - ], - tests_require = [ - 'coverage' - ], - extras_require = { - 'kfp': ['zillow-kfp', 'kfp-server-api'], - # Use an extras here as there is no "extras_tests_require" functionality :( - 'kfp-tests': ['pytest', 'pytest-xdist', 'pytest-cov', 'subprocess-tee'] - }) -======= setup( include_package_data=True, - name="metaflow", + name="zillow-metaflow", version=version, description="Metaflow: More Data Science, Less Engineering", author="Machine Learning Infrastructure Team at Netflix", @@ -53,5 +24,9 @@ "boto3", "pylint", ], + extras_require={ + "kfp": ["zillow-kfp", "kfp-server-api"], + # Use an extras here as there is no "extras_tests_require" functionality :( + "kfp-tests": ["pytest", "pytest-xdist", "pytest-cov", "subprocess-tee"], + }, ) ->>>>>>> master diff --git a/test/core/contexts.json b/test/core/contexts.json index 1142e7dffa1..d4c241f0cac 100644 --- a/test/core/contexts.json +++ b/test/core/contexts.json @@ -3,8 +3,6 @@ { "name": "python3-all-local", "disabled": false, -<<<<<<< HEAD -======= "env": { "METAFLOW_USER": "tester", "METAFLOW_RUN_BOOL_PARAM": "False", @@ -35,7 +33,6 @@ { "name": "dev-local", "disabled": true, ->>>>>>> master "env": { "METAFLOW_USER": "tester", "METAFLOW_RUN_BOOL_PARAM": "False", diff --git a/test/core/run_tests.py b/test/core/run_tests.py index 586bdb7c37d..f2ac6a5eca3 100644 --- a/test/core/run_tests.py +++ b/test/core/run_tests.py @@ -64,19 +64,12 @@ def log(msg, formatter=None, context=None, real_bad=False, real_good=False): def run_test(formatter, context, debug, checks, env_base): def run_cmd(mode): -<<<<<<< HEAD - cmd = [context['python'], '-B', 'test_flow.py'] - cmd.extend(context['top_options']) - cmd.extend((mode, '--run-id-file', 'run-id')) - cmd.extend(context['run_options']) - if debug: - print("cmd", cmd) -======= cmd = [context["python"], "-B", "test_flow.py"] cmd.extend(context["top_options"]) cmd.extend((mode, "--run-id-file", "run-id")) cmd.extend(context["run_options"]) ->>>>>>> master + if debug: + print("cmd", cmd) return cmd cwd = os.getcwd() @@ -90,16 +83,12 @@ def run_cmd(mode): with open("check_flow.py", "w") as f: f.write(formatter.check_code) -<<<<<<< HEAD if debug: print("tempdir", tempdir) - shutil.copytree(os.path.join(cwd, "metaflow_test"), os.path.join(tempdir, "metaflow_test")) -======= shutil.copytree( os.path.join(cwd, "metaflow_test"), os.path.join(tempdir, "metaflow_test") ) ->>>>>>> master path = os.path.join(tempdir, "test_flow.py") @@ -140,13 +129,10 @@ def run_cmd(mode): return pre_ret, path # run flow -<<<<<<< HEAD if debug: pprint.pprint(env) - flow_ret = subprocess.call(run_cmd('run'), env=env) -======= + flow_ret = subprocess.call(run_cmd("run"), env=env) ->>>>>>> master if flow_ret: if formatter.should_fail: log("Flow failed as expected.")