Skip to content

Commit

Permalink
Fix handling of cos_object_prefix pipeline property (#2972)
Browse files Browse the repository at this point in the history
  • Loading branch information
kiersten-stokes authored Oct 20, 2022
1 parent 4f78ac4 commit 682f0ff
Show file tree
Hide file tree
Showing 10 changed files with 174 additions and 71 deletions.
4 changes: 2 additions & 2 deletions elyra/pipeline/airflow/processor_airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ def process(self, pipeline: Pipeline) -> "AirflowPipelineProcessorResponse":
if pipeline.contains_generic_operations():
object_storage_url = f"{cos_endpoint}"
os_path = join_paths(
pipeline.pipeline_parameters.get(pipeline_constants.COS_OBJECT_PREFIX), pipeline_instance_id
pipeline.pipeline_properties.get(pipeline_constants.COS_OBJECT_PREFIX), pipeline_instance_id
)
object_storage_path = f"/{cos_bucket}/{os_path}"
else:
Expand Down Expand Up @@ -253,7 +253,7 @@ def _cc_pipeline(self, pipeline: Pipeline, pipeline_name: str, pipeline_instance

pipeline_instance_id = pipeline_instance_id or pipeline_name
artifact_object_prefix = join_paths(
pipeline.pipeline_parameters.get(pipeline_constants.COS_OBJECT_PREFIX), pipeline_instance_id
pipeline.pipeline_properties.get(pipeline_constants.COS_OBJECT_PREFIX), pipeline_instance_id
)

self.log_pipeline_info(
Expand Down
26 changes: 21 additions & 5 deletions elyra/pipeline/component_parameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,31 +139,47 @@ def build_property_map(cls) -> None:
"""Build the map of property subclasses."""
cls._subclass_property_map = {sc.property_id: sc for sc in cls.all_subclasses() if hasattr(sc, "property_id")}

@classmethod
def get_class_for_property(cls, prop_id) -> type | None:
"""Returns the ElyraProperty subclass corresponding to the given property id."""
if not cls._subclass_property_map:
cls.build_property_map()
return cls._subclass_property_map.get(prop_id)

@classmethod
def subclass_exists_for_property(cls, prop_id: str) -> bool:
"""
Returns a boolean indicating whether a corresponding ElyraProperty subclass
exists for the given property id.
"""
return cls.get_class_for_property(prop_id) is not None

@classmethod
def get_single_instance(cls, value: Optional[Dict[str, Any]] = None) -> ElyraProperty | None:
"""Unpack values from dictionary object and instantiate a class instance."""
if isinstance(value, ElyraProperty):
return value # value is already a single instance, no further action required

if not isinstance(value, dict):
value = {}

params = {attr.id: cls.strip_if_string(value.get(attr.id)) for attr in cls.property_attributes}
instance = getattr(import_module(cls.__module__), cls.__name__)(**params)
return None if instance.should_discard() else instance

@classmethod
def create_instance(cls, prop_id: str, value: Optional[Any]) -> ElyraProperty | ElyraPropertyList | None:
"""Create an instance of a class with the given property id using the user-entered values."""
if not cls._subclass_property_map:
cls.build_property_map()
sc = cls.get_class_for_property(prop_id)
if sc is None:
return None

sc = cls._subclass_property_map.get(prop_id)
if issubclass(sc, ElyraPropertyListItem):
if not isinstance(value, list):
return None
instances = [sc.get_single_instance(obj) for obj in value] # create instance for each object
return ElyraPropertyList(instances).deduplicate() # convert to ElyraPropertyList and de-dupe
elif issubclass(sc, ElyraProperty):
return sc.get_single_instance(value)

return None

@classmethod
Expand Down
4 changes: 2 additions & 2 deletions elyra/pipeline/kfp/processor_kfp.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ def process(self, pipeline):
if pipeline.contains_generic_operations():
object_storage_url = f"{cos_public_endpoint}"
os_path = join_paths(
pipeline.pipeline_parameters.get(pipeline_constants.COS_OBJECT_PREFIX), pipeline_instance_id
pipeline.pipeline_properties.get(pipeline_constants.COS_OBJECT_PREFIX), pipeline_instance_id
)
object_storage_path = f"/{cos_bucket}/{os_path}"
else:
Expand Down Expand Up @@ -486,7 +486,7 @@ def _cc_pipeline(
pipeline_instance_id = pipeline_instance_id or pipeline_name

artifact_object_prefix = join_paths(
pipeline.pipeline_parameters.get(pipeline_constants.COS_OBJECT_PREFIX), pipeline_instance_id
pipeline.pipeline_properties.get(pipeline_constants.COS_OBJECT_PREFIX), pipeline_instance_id
)

self.log_pipeline_info(
Expand Down
2 changes: 1 addition & 1 deletion elyra/pipeline/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def parse(self, pipeline_json: Dict) -> Pipeline:
runtime_config=runtime_config,
source=source,
description=description,
pipeline_parameters=primary_pipeline.pipeline_parameters,
pipeline_properties=primary_pipeline.get_pipeline_default_properties(),
)

nodes = primary_pipeline.nodes
Expand Down
12 changes: 6 additions & 6 deletions elyra/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ def __init__(
runtime_config: Optional[str] = None,
source: Optional[str] = None,
description: Optional[str] = None,
pipeline_parameters: Optional[Dict[str, Any]] = None,
pipeline_properties: Optional[Dict[str, Any]] = None,
):
"""
:param id: Generated UUID, 128 bit number used as a unique identifier
Expand All @@ -351,7 +351,7 @@ def __init__(
:param runtime_config: Runtime configuration that should be used to submit the pipeline to execution
:param source: The pipeline source, e.g. a pipeline file or a notebook.
:param description: Pipeline description
:param pipeline_parameters: Key/value pairs representing the parameters of this pipeline
:param pipeline_properties: Key/value pairs representing the properties of this pipeline
"""

if not name:
Expand All @@ -365,7 +365,7 @@ def __init__(
self._source = source
self._runtime = runtime
self._runtime_config = runtime_config
self._pipeline_parameters = pipeline_parameters or {}
self._pipeline_properties = pipeline_properties or {}
self._operations = {}

@property
Expand Down Expand Up @@ -395,11 +395,11 @@ def runtime_config(self) -> str:
return self._runtime_config

@property
def pipeline_parameters(self) -> Dict[str, Any]:
def pipeline_properties(self) -> Dict[str, Any]:
"""
The dictionary of global parameters associated with each node of the pipeline
The dictionary of global properties associated with this pipeline
"""
return self._pipeline_parameters
return self._pipeline_properties

@property
def operations(self) -> Dict[str, Operation]:
Expand Down
4 changes: 1 addition & 3 deletions elyra/pipeline/pipeline_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,4 @@
KUBERNETES_POD_LABELS = "kubernetes_pod_labels"
DISABLE_NODE_CACHING = "disable_node_caching"
KUBERNETES_SHARED_MEM_SIZE = "kubernetes_shared_mem_size"
PIPELINE_META_PROPERTIES = ["name", "description", "runtime"]
# optional static prefix to be used when generating an object name for object storage
COS_OBJECT_PREFIX = "cos_object_prefix"
COS_OBJECT_PREFIX = "cos_object_prefix" # optional static prefix to be used when generating object name for cos storage
76 changes: 33 additions & 43 deletions elyra/pipeline/pipeline_definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@
from elyra.pipeline.component_parameter import ElyraProperty
from elyra.pipeline.component_parameter import ElyraPropertyList
from elyra.pipeline.pipeline import Operation
from elyra.pipeline.pipeline_constants import ENV_VARIABLES, RUNTIME_IMAGE
from elyra.pipeline.pipeline_constants import COS_OBJECT_PREFIX
from elyra.pipeline.pipeline_constants import ENV_VARIABLES
from elyra.pipeline.pipeline_constants import KUBERNETES_SECRETS
from elyra.pipeline.pipeline_constants import PIPELINE_DEFAULTS
from elyra.pipeline.pipeline_constants import PIPELINE_META_PROPERTIES
from elyra.pipeline.pipeline_constants import RUNTIME_IMAGE
from elyra.pipeline.runtime_type import RuntimeProcessorType


Expand Down Expand Up @@ -183,23 +184,19 @@ def comments(self) -> list:
"""
return self._node["app_data"]["ui_data"].get("comments", [])

@property
def pipeline_parameters(self) -> Dict[str, Any]:
"""
Retrieve pipeline parameters, which are defined as all
key/value pairs in the 'properties' stanza that are not
either pipeline meta-properties (e.g. name, description,
and runtime) or the pipeline defaults dictionary
"""
all_properties = self._node["app_data"].get("properties", {})
excluded_properties = PIPELINE_META_PROPERTIES + [PIPELINE_DEFAULTS]
def get_pipeline_default_properties(self) -> Dict[str, Any]:
"""Retrieve the dictionary of pipeline default properties"""
pipeline_defaults = self.get_property(PIPELINE_DEFAULTS, {})

pipeline_parameters = {}
for property_name, value in all_properties.items():
if property_name not in excluded_properties:
pipeline_parameters[property_name] = value
# TODO remove the block below when a pipeline migration is appropriate (after 3.13)
cos_prefix = self._node["app_data"].get("properties", {}).pop(COS_OBJECT_PREFIX, None)
if cos_prefix:
if PIPELINE_DEFAULTS in self._node["app_data"]["properties"]:
self._node["app_data"]["properties"][PIPELINE_DEFAULTS][COS_OBJECT_PREFIX] = cos_prefix
else:
self._node["app_data"]["properties"][PIPELINE_DEFAULTS] = {COS_OBJECT_PREFIX: cos_prefix}

return pipeline_parameters
return pipeline_defaults

def get_property(self, key: str, default_value=None) -> Any:
"""
Expand Down Expand Up @@ -233,18 +230,16 @@ def convert_elyra_owned_properties(self) -> None:
Convert select pipeline-level properties to their corresponding dataclass
object type. No validation is performed.
"""
pipeline_defaults = self.get_property(PIPELINE_DEFAULTS, {})
for param_id, param_value in list(pipeline_defaults.items()):
if param_id == RUNTIME_IMAGE:
continue # runtime image is the only pipeline default that does not need to be converted
if isinstance(param_value, (ElyraProperty, ElyraPropertyList)) or param_value is None:
continue # property has already been properly converted or cannot be converted

converted_value = ElyraProperty.create_instance(param_id, param_value)
if converted_value is not None:
pipeline_defaults[param_id] = converted_value
pipeline_defaults = self.get_pipeline_default_properties()
for prop_id, value in list(pipeline_defaults.items()):
if not ElyraProperty.subclass_exists_for_property(prop_id):
continue

converted_value = ElyraProperty.create_instance(prop_id, value)
if converted_value is None:
pipeline_defaults.pop(prop_id)
else:
del pipeline_defaults[param_id]
pipeline_defaults[prop_id] = converted_value


class Node(AppDataBase):
Expand Down Expand Up @@ -314,9 +309,7 @@ def component_source(self) -> Optional[str]:

@property
def is_generic(self) -> True:
"""
A property that denotes whether this node is a generic component
"""
"""A property that denotes whether this node is a generic component"""
if Operation.is_generic_operation(self.op):
return True
return False
Expand Down Expand Up @@ -400,9 +393,7 @@ def pop_component_parameter(self, key: str, default: Optional[Any] = None) -> An
return self._node["app_data"]["component_parameters"].pop(key, default)

def get_all_component_parameters(self) -> Dict[str, Any]:
"""
Retrieve all component parameter key-value pairs.
"""
"""Retrieve all component parameter key-value pairs."""
return self._node["app_data"]["component_parameters"]

def remove_env_vars_with_matching_secrets(self):
Expand All @@ -421,16 +412,15 @@ def convert_elyra_owned_properties(self) -> None:
Convert select node-level list properties to their corresponding dataclass
object type. No validation is performed.
"""
for param_id in self.elyra_owned_properties:
param_value = self.get_component_parameter(param_id)
if isinstance(param_value, (ElyraProperty, ElyraPropertyList)) or param_value is None:
continue # property has already been properly converted or cannot be converted
for prop_id in self.elyra_owned_properties:
if not ElyraProperty.subclass_exists_for_property(prop_id):
continue

converted_value = ElyraProperty.create_instance(param_id, param_value)
if converted_value is not None:
self.set_component_parameter(param_id, converted_value)
converted_value = ElyraProperty.create_instance(prop_id, value=self.get_component_parameter(prop_id))
if converted_value is None:
self.pop_component_parameter(prop_id)
else:
self.pop_component_parameter(param_id)
self.set_component_parameter(prop_id, converted_value)


class PipelineDefinition(object):
Expand Down Expand Up @@ -607,7 +597,7 @@ def propagate_pipeline_default_properties(self):
"""
self.primary_pipeline.convert_elyra_owned_properties()

pipeline_default_properties = self.primary_pipeline.get_property(PIPELINE_DEFAULTS, {})
pipeline_default_properties = self.primary_pipeline.get_pipeline_default_properties()
for node in self.pipeline_nodes:
# Determine which Elyra-owned properties will require dataclass conversion, then convert
node.set_elyra_owned_properties(self.primary_pipeline.type)
Expand Down
23 changes: 17 additions & 6 deletions elyra/tests/pipeline/airflow/test_processor_airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from elyra.pipeline.component_parameter import ElyraProperty
from elyra.pipeline.parser import PipelineParser
from elyra.pipeline.pipeline import GenericOperation
from elyra.pipeline.pipeline_constants import COS_OBJECT_PREFIX
from elyra.pipeline.pipeline_constants import MOUNTED_VOLUMES
from elyra.pipeline.runtime_type import RuntimeProcessorType
from elyra.tests.pipeline.test_pipeline_parser import _read_pipeline_resource
Expand Down Expand Up @@ -153,8 +154,11 @@ def test_pipeline_process(monkeypatch, processor, parsed_pipeline, sample_metada

assert response.run_url == sample_metadata["metadata"]["api_endpoint"]
assert response.object_storage_url == sample_metadata["metadata"]["cos_endpoint"]
# Verifies that only this substring is in the storage path since a timestamp is injected into the name
assert "/" + sample_metadata["metadata"]["cos_bucket"] + "/" + "untitled" in response.object_storage_path

# Verifies cos_object_prefix is added to storage path and that the correct substring is
# in the storage path since a timestamp is injected into the name
cos_prefix = parsed_pipeline.pipeline_properties.get(COS_OBJECT_PREFIX)
assert f"/{sample_metadata['metadata']['cos_bucket']}/{cos_prefix}/untitled" in response.object_storage_path


@pytest.mark.parametrize("parsed_pipeline", [PIPELINE_FILE_COMPLEX], indirect=True)
Expand All @@ -172,6 +176,10 @@ def test_create_file(monkeypatch, processor, parsed_pipeline, parsed_ordered_dic
monkeypatch.setattr(processor, "_upload_dependencies_to_object_store", lambda w, x, y, prefix: True)
monkeypatch.setattr(processor, "_cc_pipeline", lambda x, y, z: parsed_ordered_dict)

# Ensure the value of COS_OBJECT_PREFIX has been propagated to the Pipeline object appropriately
cos_prefix = pipeline_json["pipelines"][0]["app_data"]["properties"]["pipeline_defaults"].get(COS_OBJECT_PREFIX)
assert cos_prefix == parsed_pipeline.pipeline_properties.get(COS_OBJECT_PREFIX)

with tempfile.TemporaryDirectory() as temp_dir:
export_pipeline_output_path = os.path.join(temp_dir, f"{export_pipeline_name}.py")

Expand Down Expand Up @@ -206,12 +214,15 @@ def test_create_file(monkeypatch, processor, parsed_pipeline, parsed_ordered_dic
# Gets sub-list slice starting where the Notebook Op starts
init_line = i + 1
for idx, line in enumerate(file_as_lines[init_line:], start=init_line):
if "--cos-endpoint" in line:
assert f"--cos-endpoint {sample_metadata['metadata']['cos_endpoint']}" in line
if "--cos-bucket" in line:
assert f"--cos-bucket {sample_metadata['metadata']['cos_bucket']}" in line
if "--cos-directory" in line:
assert f"--cos-directory '{cos_prefix}/some-instance-id'" in line

if "namespace=" in line:
assert sample_metadata["metadata"]["user_namespace"] == read_key_pair(line)["value"]
elif "cos_endpoint=" in line:
assert sample_metadata["metadata"]["cos_endpoint"] == read_key_pair(line)["value"]
elif "cos_bucket=" in line:
assert sample_metadata["metadata"]["cos_bucket"] == read_key_pair(line)["value"]
elif "name=" in line and "Volume" not in file_as_lines[idx - 1]:
assert node["app_data"]["ui_data"]["label"] == read_key_pair(line)["value"]
elif "notebook=" in line:
Expand Down
Loading

0 comments on commit 682f0ff

Please sign in to comment.