From d204d5654a478bb0047e51513fc01c04af5ec898 Mon Sep 17 00:00:00 2001 From: Tamas Nemeth Date: Tue, 9 Jul 2024 13:54:06 +0200 Subject: [PATCH] fix(ingestion/spark): Platform instance and column level lineage fix (#10843) Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> --- .../docs/sources/databricks/README.md | 2 +- .../config/DatahubOpenlineageConfig.java | 1 + .../converter/OpenLineageToDataHub.java | 49 +++++- .../openlineage/dataset/DatahubJob.java | 4 +- .../openlineage/dataset/PathSpec.java | 2 +- .../java/spark-lineage-beta/README.md | 31 +++- .../datahub/spark/conf/SparkConfigParser.java | 27 ++- .../spark/agent/util/RddPathUtils.java | 162 ++++++++++++++++++ .../spark/OpenLineageEventToDatahubTest.java | 156 +++++++++++++++++ .../redshift_mixed_case_lineage_spark.json | 147 ++++++++++++++++ 10 files changed, 556 insertions(+), 25 deletions(-) create mode 100644 metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/util/RddPathUtils.java create mode 100644 metadata-integration/java/spark-lineage-beta/src/test/resources/ol_events/redshift_mixed_case_lineage_spark.json diff --git a/metadata-ingestion/docs/sources/databricks/README.md b/metadata-ingestion/docs/sources/databricks/README.md index 32b0b20c9480b8..a6cf39084c6abe 100644 --- a/metadata-ingestion/docs/sources/databricks/README.md +++ b/metadata-ingestion/docs/sources/databricks/README.md @@ -11,7 +11,7 @@ The alternative way to integrate is via the Hive connector. The [Hive starter re ## Databricks Spark -To complete the picture, we recommend adding push-based ingestion from your Spark jobs to see real-time activity and lineage between your Databricks tables and your Spark jobs. Use the Spark agent to push metadata to DataHub using the instructions [here](../../../../metadata-integration/java/spark-lineage/README.md#configuration-instructions-databricks). +To complete the picture, we recommend adding push-based ingestion from your Spark jobs to see real-time activity and lineage between your Databricks tables and your Spark jobs. Use the Spark agent to push metadata to DataHub using the instructions [here](../../../../metadata-integration/java/spark-lineage-beta/README.md#configuration-instructions-databricks). ## Watch the DataHub Talk at the Data and AI Summit 2022 diff --git a/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/config/DatahubOpenlineageConfig.java b/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/config/DatahubOpenlineageConfig.java index 71a18c8f9eff1d..5abb3c90d232bd 100644 --- a/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/config/DatahubOpenlineageConfig.java +++ b/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/config/DatahubOpenlineageConfig.java @@ -33,6 +33,7 @@ public class DatahubOpenlineageConfig { @Builder.Default private String hivePlatformAlias = "hive"; @Builder.Default private Map urnAliases = new HashMap<>(); @Builder.Default private final boolean disableSymlinkResolution = false; + @Builder.Default private final boolean lowerCaseDatasetUrns = false; public List getPathSpecsForPlatform(String platform) { if ((pathSpecs == null) || (pathSpecs.isEmpty())) { diff --git a/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/converter/OpenLineageToDataHub.java b/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/converter/OpenLineageToDataHub.java index d1df27c3ffd116..1568451beff104 100644 --- a/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/converter/OpenLineageToDataHub.java +++ b/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/converter/OpenLineageToDataHub.java @@ -4,6 +4,7 @@ import com.linkedin.common.DataPlatformInstance; import com.linkedin.common.Edge; import com.linkedin.common.EdgeArray; +import com.linkedin.common.FabricType; import com.linkedin.common.GlobalTags; import com.linkedin.common.Owner; import com.linkedin.common.OwnerArray; @@ -57,6 +58,8 @@ import io.datahubproject.openlineage.dataset.DatahubJob; import io.datahubproject.openlineage.dataset.HdfsPathDataset; import io.datahubproject.openlineage.dataset.HdfsPlatform; +import io.datahubproject.openlineage.dataset.PathSpec; +import io.datahubproject.openlineage.utils.DatahubUtils; import io.openlineage.client.OpenLineage; import io.openlineage.client.OpenLineageClientUtils; import java.io.IOException; @@ -151,6 +154,11 @@ public static Optional convertOpenlineageDatasetToDatasetUrn( private static Optional getDatasetUrnFromOlDataset( String namespace, String datasetName, DatahubOpenlineageConfig mappingConfig) { String platform; + if (mappingConfig.isLowerCaseDatasetUrns()) { + namespace = namespace.toLowerCase(); + datasetName = datasetName.toLowerCase(); + } + if (namespace.contains(SCHEME_SEPARATOR)) { try { URI datasetUri; @@ -183,12 +191,45 @@ private static Optional getDatasetUrnFromOlDataset( platform = namespace; } - if (mappingConfig.getCommonDatasetPlatformInstance() != null) { - datasetName = mappingConfig.getCommonDatasetPlatformInstance() + "." + datasetName; + String platformInstance = getPlatformInstance(mappingConfig, platform); + FabricType env = getEnv(mappingConfig, platform); + return Optional.of(DatahubUtils.createDatasetUrn(platform, platformInstance, datasetName, env)); + } + + private static FabricType getEnv(DatahubOpenlineageConfig mappingConfig, String platform) { + FabricType fabricType = mappingConfig.getFabricType(); + if (mappingConfig.getPathSpecs() != null + && mappingConfig.getPathSpecs().containsKey(platform)) { + List path_specs = mappingConfig.getPathSpecs().get(platform); + for (PathSpec pathSpec : path_specs) { + if (pathSpec.getEnv().isPresent()) { + try { + fabricType = FabricType.valueOf(pathSpec.getEnv().get()); + return fabricType; + } catch (IllegalArgumentException e) { + log.warn("Invalid environment value: {}", pathSpec.getEnv()); + } + } + } } + return fabricType; + } - return Optional.of( - new DatasetUrn(new DataPlatformUrn(platform), datasetName, mappingConfig.getFabricType())); + private static String getPlatformInstance( + DatahubOpenlineageConfig mappingConfig, String platform) { + // Use the platform instance from the path spec if it is present otherwise use the one from the + // commonDatasetPlatformInstance + String platformInstance = mappingConfig.getCommonDatasetPlatformInstance(); + if (mappingConfig.getPathSpecs() != null + && mappingConfig.getPathSpecs().containsKey(platform)) { + List path_specs = mappingConfig.getPathSpecs().get(platform); + for (PathSpec pathSpec : path_specs) { + if (pathSpec.getPlatformInstance().isPresent()) { + return pathSpec.getPlatformInstance().get(); + } + } + } + return platformInstance; } public static GlobalTags generateTags(List tags) { diff --git a/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/dataset/DatahubJob.java b/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/dataset/DatahubJob.java index 85eaeb445d7bda..60caaae359677f 100644 --- a/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/dataset/DatahubJob.java +++ b/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/dataset/DatahubJob.java @@ -280,11 +280,11 @@ private Pair processDownstreams( for (Urn downstream : Objects.requireNonNull(fineGrainedLineage.getDownstreams())) { upstreamLineagePatchBuilder.addFineGrainedUpstreamField( - downstream, + upstream, fineGrainedLineage.getConfidenceScore(), StringUtils.defaultIfEmpty( fineGrainedLineage.getTransformOperation(), "TRANSFORM"), - upstream, + downstream, null); } } diff --git a/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/dataset/PathSpec.java b/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/dataset/PathSpec.java index 1a015cabc46cc5..b73b1763216036 100644 --- a/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/dataset/PathSpec.java +++ b/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/dataset/PathSpec.java @@ -14,7 +14,7 @@ public class PathSpec { final String alias; final String platform; - @Builder.Default final String env = "PROD"; + @Builder.Default final Optional env = Optional.empty(); final List pathSpecList; @Builder.Default final Optional platformInstance = Optional.empty(); } diff --git a/metadata-integration/java/spark-lineage-beta/README.md b/metadata-integration/java/spark-lineage-beta/README.md index a4d90b25c27bf4..a643919664b079 100644 --- a/metadata-integration/java/spark-lineage-beta/README.md +++ b/metadata-integration/java/spark-lineage-beta/README.md @@ -24,7 +24,7 @@ When running jobs using spark-submit, the agent needs to be configured in the co ```text #Configuring DataHub spark agent jar -spark.jars.packages io.acryl:acryl-spark-lineage:0.2.11 +spark.jars.packages io.acryl:acryl-spark-lineage:0.2.13 spark.extraListeners datahub.spark.DatahubSparkListener spark.datahub.rest.server http://localhost:8080 ``` @@ -32,7 +32,7 @@ spark.datahub.rest.server http://localhost:8080 ## spark-submit command line ```sh -spark-submit --packages io.acryl:acryl-spark-lineage:0.2.11 --conf "spark.extraListeners=datahub.spark.DatahubSparkListener" my_spark_job_to_run.py +spark-submit --packages io.acryl:acryl-spark-lineage:0.2.13 --conf "spark.extraListeners=datahub.spark.DatahubSparkListener" my_spark_job_to_run.py ``` ### Configuration Instructions: Amazon EMR @@ -41,7 +41,7 @@ Set the following spark-defaults configuration properties as it stated [here](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-configure.html) ```text -spark.jars.packages io.acryl:acryl-spark-lineage:0.2.11 +spark.jars.packages io.acryl:acryl-spark-lineage:0.2.13 spark.extraListeners datahub.spark.DatahubSparkListener spark.datahub.rest.server https://your_datahub_host/gms #If you have authentication set up then you also need to specify the Datahub access token @@ -56,7 +56,7 @@ When running interactive jobs from a notebook, the listener can be configured wh spark = SparkSession.builder .master("spark://spark-master:7077") .appName("test-application") -.config("spark.jars.packages", "io.acryl:acryl-spark-lineage:0.2.11") +.config("spark.jars.packages", "io.acryl:acryl-spark-lineage:0.2.13") .config("spark.extraListeners", "datahub.spark.DatahubSparkListener") .config("spark.datahub.rest.server", "http://localhost:8080") .enableHiveSupport() @@ -79,7 +79,7 @@ appName("test-application") config("spark.master","spark://spark-master:7077") . -config("spark.jars.packages","io.acryl:acryl-spark-lineage:0.2.11") +config("spark.jars.packages","io.acryl:acryl-spark-lineage:0.2.13") . config("spark.extraListeners","datahub.spark.DatahubSparkListener") @@ -159,7 +159,7 @@ information like tokens. | Field | Required | Default | Description | |---------------------------------------------------------------------|----------|---------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| spark.jars.packages | ✅ | | Set with latest/required version io.acryl:datahub-spark-lineage:0.8.23 | +| spark.jars.packages | ✅ | | Set with latest/required version io.acryl:acryl-spark-lineage:0.2.13 | | spark.extraListeners | ✅ | | datahub.spark.DatahubSparkListener | | spark.datahub.rest.server | ✅ | | Datahub server url eg: | | spark.datahub.rest.token | | | Authentication token. | @@ -181,9 +181,10 @@ information like tokens. | spark.datahub.partition_regexp_pattern | | | Strip partition part from the path if path end matches with the specified regexp. Example `year=.*/month=.*/day=.*` | | spark.datahub.tags | | | Comma separated list of tags to attach to the DataFlow | | spark.datahub.domains | | | Comma separated list of domain urns to attach to the DataFlow | -| spark.datahub.stage_metadata_coalescing | | | Normally it coalesce and send metadata at the onApplicationEnd event which is never called on Databricks or on Glue. You should enable this on Databricks if you want coalesced run . | -| spark.datahub.patch.enabled | | false | Set this to true to send lineage as a patch, which appends rather than overwrites existing Dataset lineage edges. By default it is enabled. | -| +| spark.datahub.stage_metadata_coalescing | | | Normally it coalesces and sends metadata at the onApplicationEnd event which is never called on Databricks or on Glue. You should enable this on Databricks if you want coalesced run. | +| spark.datahub.patch.enabled | | false | Set this to true to send lineage as a patch, which appends rather than overwrites existing Dataset lineage edges. By default, it is disabled. | +| spark.datahub.metadata.dataset.lowerCaseUrns | | false | Set this to true to lowercase dataset urns. By default, it is disabled. | +| spark.datahub.disableSymlinkResolution | | false | Set this to true if you prefer using the s3 location instead of the Hive table. By default, it is disabled. | ## What to Expect: The Metadata Model @@ -343,3 +344,15 @@ Use Java 8 to build the project. The project uses Gradle as the build tool. To b ``` ## Known limitations ++ +## Changelog + +### Version 0.2.12 +- Silencing some chatty warnings in RddPathUtils + +### Version 0.2.12 + +- Add option to lowercase dataset URNs +- Add option to set platform instance and/or env per platform with `spark.datahub.platform..env` and `spark.datahub.platform..platform_instance` config parameter +- Fixing platform instance setting for datasets when `spark.datahub.metadata.dataset.platformInstance` is set +- Fixing column level lineage support when patch is enabled diff --git a/metadata-integration/java/spark-lineage-beta/src/main/java/datahub/spark/conf/SparkConfigParser.java b/metadata-integration/java/spark-lineage-beta/src/main/java/datahub/spark/conf/SparkConfigParser.java index f9dd6c127e1b98..c854861af2f81d 100644 --- a/metadata-integration/java/spark-lineage-beta/src/main/java/datahub/spark/conf/SparkConfigParser.java +++ b/metadata-integration/java/spark-lineage-beta/src/main/java/datahub/spark/conf/SparkConfigParser.java @@ -42,6 +42,8 @@ public class SparkConfigParser { public static final String DATAHUB_FLOW_NAME = "flow_name"; public static final String DATASET_ENV_KEY = "metadata.dataset.env"; public static final String DATASET_HIVE_PLATFORM_ALIAS = "metadata.dataset.hivePlatformAlias"; + public static final String DATASET_LOWERCASE_URNS = "metadata.dataset.lowerCaseUrns"; + public static final String DATASET_MATERIALIZE_KEY = "metadata.dataset.materialize"; public static final String DATASET_PLATFORM_INSTANCE_KEY = "metadata.dataset.platformInstance"; public static final String DATASET_INCLUDE_SCHEMA_METADATA = @@ -152,6 +154,7 @@ public static DatahubOpenlineageConfig sparkConfigToDatahubOpenlineageConf( builder.hivePlatformAlias(SparkConfigParser.getHivePlatformAlias(sparkConfig)); builder.usePatch(SparkConfigParser.isPatchEnabled(sparkConfig)); builder.disableSymlinkResolution(SparkConfigParser.isDisableSymlinkResolution(sparkConfig)); + builder.lowerCaseDatasetUrns(SparkConfigParser.isLowerCaseDatasetUrns(sparkConfig)); try { String parentJob = SparkConfigParser.getParentJobKey(sparkConfig); if (parentJob != null) { @@ -246,15 +249,18 @@ public static Map> getPathSpecListMap(Config datahubConfi pathSpecBuilder.alias(pathSpecKey); pathSpecBuilder.platform(key); if (datahubConfig.hasPath(aliasKey + ".env")) { - pathSpecBuilder.env(datahubConfig.getString(aliasKey + ".env")); + pathSpecBuilder.env(Optional.ofNullable(datahubConfig.getString(aliasKey + ".env"))); } - if (datahubConfig.hasPath(aliasKey + ".platformInstance")) { + if (datahubConfig.hasPath(aliasKey + "." + PLATFORM_INSTANCE_KEY)) { pathSpecBuilder.platformInstance( - Optional.ofNullable(datahubConfig.getString(aliasKey + ".platformInstance"))); + Optional.ofNullable( + datahubConfig.getString(aliasKey + "." + PLATFORM_INSTANCE_KEY))); + } + if (datahubConfig.hasPath(aliasKey + "." + PATH_SPEC_LIST_KEY)) { + pathSpecBuilder.pathSpecList( + Arrays.asList( + datahubConfig.getString(aliasKey + "." + PATH_SPEC_LIST_KEY).split(","))); } - pathSpecBuilder.pathSpecList( - Arrays.asList(datahubConfig.getString(aliasKey + "." + pathSpecKey).split(","))); - platformSpecs.add(pathSpecBuilder.build()); } pathSpecMap.put(key, platformSpecs); @@ -264,8 +270,8 @@ public static Map> getPathSpecListMap(Config datahubConfi } public static String getPlatformInstance(Config pathSpecConfig) { - return pathSpecConfig.hasPath(PLATFORM_INSTANCE_KEY) - ? pathSpecConfig.getString(PLATFORM_INSTANCE_KEY) + return pathSpecConfig.hasPath(PIPELINE_PLATFORM_INSTANCE_KEY) + ? pathSpecConfig.getString(PIPELINE_PLATFORM_INSTANCE_KEY) : null; } @@ -341,4 +347,9 @@ public static boolean isEmitCoalescePeriodically(Config datahubConfig) { return datahubConfig.hasPath(STAGE_METADATA_COALESCING) && datahubConfig.getBoolean(STAGE_METADATA_COALESCING); } + + public static boolean isLowerCaseDatasetUrns(Config datahubConfig) { + return datahubConfig.hasPath(DATASET_LOWERCASE_URNS) + && datahubConfig.getBoolean(DATASET_LOWERCASE_URNS); + } } diff --git a/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/util/RddPathUtils.java b/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/util/RddPathUtils.java new file mode 100644 index 00000000000000..62005bf15f8505 --- /dev/null +++ b/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/util/RddPathUtils.java @@ -0,0 +1,162 @@ +/* +/* Copyright 2018-2024 contributors to the OpenLineage project +/* SPDX-License-Identifier: Apache-2.0 +*/ + +package io.openlineage.spark.agent.util; + +import java.util.Arrays; +import java.util.Objects; +import java.util.stream.Stream; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.reflect.FieldUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.spark.package$; +import org.apache.spark.rdd.HadoopRDD; +import org.apache.spark.rdd.MapPartitionsRDD; +import org.apache.spark.rdd.ParallelCollectionRDD; +import org.apache.spark.rdd.RDD; +import org.apache.spark.sql.execution.datasources.FileScanRDD; +import scala.Tuple2; +import scala.collection.immutable.Seq; + +/** Utility class to extract paths from RDD nodes. */ +@Slf4j +public class RddPathUtils { + + public static Stream findRDDPaths(RDD rdd) { + return Stream.of( + new HadoopRDDExtractor(), + new FileScanRDDExtractor(), + new MapPartitionsRDDExtractor(), + new ParallelCollectionRDDExtractor()) + .filter(e -> e.isDefinedAt(rdd)) + .findFirst() + .orElse(new UnknownRDDExtractor()) + .extract(rdd) + .filter(p -> p != null); + } + + static class UnknownRDDExtractor implements RddPathExtractor { + @Override + public boolean isDefinedAt(Object rdd) { + return true; + } + + @Override + public Stream extract(RDD rdd) { + // Change to debug to silence error + log.debug("Unknown RDD class {}", rdd); + return Stream.empty(); + } + } + + static class HadoopRDDExtractor implements RddPathExtractor { + @Override + public boolean isDefinedAt(Object rdd) { + return rdd instanceof HadoopRDD; + } + + @Override + public Stream extract(HadoopRDD rdd) { + org.apache.hadoop.fs.Path[] inputPaths = FileInputFormat.getInputPaths(rdd.getJobConf()); + Configuration hadoopConf = rdd.getConf(); + return Arrays.stream(inputPaths).map(p -> PlanUtils.getDirectoryPath(p, hadoopConf)); + } + } + + static class MapPartitionsRDDExtractor implements RddPathExtractor { + + @Override + public boolean isDefinedAt(Object rdd) { + return rdd instanceof MapPartitionsRDD; + } + + @Override + public Stream extract(MapPartitionsRDD rdd) { + return findRDDPaths(rdd.prev()); + } + } + + static class FileScanRDDExtractor implements RddPathExtractor { + @Override + public boolean isDefinedAt(Object rdd) { + return rdd instanceof FileScanRDD; + } + + @Override + @SuppressWarnings("PMD.AvoidLiteralsInIfCondition") + public Stream extract(FileScanRDD rdd) { + return ScalaConversionUtils.fromSeq(rdd.filePartitions()).stream() + .flatMap(fp -> Arrays.stream(fp.files())) + .map( + f -> { + if ("3.4".compareTo(package$.MODULE$.SPARK_VERSION()) <= 0) { + // filePath returns SparkPath for Spark 3.4 + return ReflectionUtils.tryExecuteMethod(f, "filePath") + .map(o -> ReflectionUtils.tryExecuteMethod(o, "toPath")) + .map(o -> (Path) o.get()) + .get() + .getParent(); + } else { + return parentOf(f.filePath()); + } + }); + } + } + + static class ParallelCollectionRDDExtractor implements RddPathExtractor { + @Override + public boolean isDefinedAt(Object rdd) { + return rdd instanceof ParallelCollectionRDD; + } + + @Override + public Stream extract(ParallelCollectionRDD rdd) { + try { + Object data = FieldUtils.readField(rdd, "data", true); + log.debug("ParallelCollectionRDD data: {}", data); + if (data instanceof Seq) { + return ScalaConversionUtils.fromSeq((Seq) data).stream() + .map( + el -> { + Path path = null; + if (el instanceof Tuple2) { + // we're able to extract path + path = parentOf(((Tuple2) el)._1.toString()); + log.debug("Found input {}", path); + } else { + // Change to debug to silence error + log.debug("unable to extract Path from {}", el.getClass().getCanonicalName()); + } + return path; + }) + .filter(Objects::nonNull); + } else { + // Changed to debug to silence error + log.debug("Cannot extract path from ParallelCollectionRDD {}", data); + } + } catch (IllegalAccessException | IllegalArgumentException e) { + // Changed to debug to silence error + log.debug("Cannot read data field from ParallelCollectionRDD {}", rdd); + } + return Stream.empty(); + } + } + + private static Path parentOf(String path) { + try { + return new Path(path).getParent(); + } catch (Exception e) { + return null; + } + } + + interface RddPathExtractor { + boolean isDefinedAt(Object rdd); + + Stream extract(T rdd); + } +} diff --git a/metadata-integration/java/spark-lineage-beta/src/test/java/datahub/spark/OpenLineageEventToDatahubTest.java b/metadata-integration/java/spark-lineage-beta/src/test/java/datahub/spark/OpenLineageEventToDatahubTest.java index 82fa0655a93e0c..b42c87618d2b62 100644 --- a/metadata-integration/java/spark-lineage-beta/src/test/java/datahub/spark/OpenLineageEventToDatahubTest.java +++ b/metadata-integration/java/spark-lineage-beta/src/test/java/datahub/spark/OpenLineageEventToDatahubTest.java @@ -13,12 +13,14 @@ import io.datahubproject.openlineage.converter.OpenLineageToDataHub; import io.datahubproject.openlineage.dataset.DatahubDataset; import io.datahubproject.openlineage.dataset.DatahubJob; +import io.datahubproject.openlineage.dataset.PathSpec; import io.openlineage.client.OpenLineage; import io.openlineage.client.OpenLineageClientUtils; import java.io.IOException; import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; import java.util.HashMap; +import java.util.List; import java.util.Optional; import java.util.stream.Stream; import junit.framework.TestCase; @@ -598,4 +600,158 @@ public void testProcessRedshiftOutput() throws URISyntaxException, IOException { dataset.getSchemaMetadata().getPlatform().toString(), "urn:li:dataPlatform:redshift"); } } + + public void testProcessRedshiftOutputWithPlatformInstance() + throws URISyntaxException, IOException { + DatahubOpenlineageConfig.DatahubOpenlineageConfigBuilder builder = + DatahubOpenlineageConfig.builder(); + builder.fabricType(FabricType.DEV); + builder.hivePlatformAlias("glue"); + builder.materializeDataset(true); + builder.includeSchemaMetadata(true); + builder.commonDatasetPlatformInstance("my-platform-instance"); + + String olEvent = + IOUtils.toString( + this.getClass().getResourceAsStream("/ol_events/redshift_lineage_spark.json"), + StandardCharsets.UTF_8); + + OpenLineage.RunEvent runEvent = OpenLineageClientUtils.runEventFromJson(olEvent); + DatahubJob datahubJob = OpenLineageToDataHub.convertRunEventToJob(runEvent, builder.build()); + + assertNotNull(datahubJob); + + for (DatahubDataset dataset : datahubJob.getInSet()) { + assertEquals( + "urn:li:dataset:(urn:li:dataPlatform:mysql,my-platform-instance.datahub.metadata_aspect_v2,DEV)", + dataset.getUrn().toString()); + } + for (DatahubDataset dataset : datahubJob.getOutSet()) { + assertEquals( + "urn:li:dataset:(urn:li:dataPlatform:redshift,my-platform-instance.dev.public.spark_redshift_load_test,DEV)", + dataset.getUrn().toString()); + assertEquals( + dataset.getSchemaMetadata().getPlatform().toString(), "urn:li:dataPlatform:redshift"); + } + } + + public void testProcessRedshiftOutputWithPlatformSpecificPlatformInstance() + throws URISyntaxException, IOException { + DatahubOpenlineageConfig.DatahubOpenlineageConfigBuilder builder = + DatahubOpenlineageConfig.builder(); + builder.fabricType(FabricType.DEV); + builder.hivePlatformAlias("glue"); + builder.materializeDataset(true); + builder.includeSchemaMetadata(true); + builder.pathSpecs( + new HashMap>() { + { + put( + "redshift", + List.of( + PathSpec.builder() + .platform("redshift") + .platformInstance(Optional.of("my-platform-instance")) + .build())); + } + }); + + String olEvent = + IOUtils.toString( + this.getClass().getResourceAsStream("/ol_events/redshift_lineage_spark.json"), + StandardCharsets.UTF_8); + + OpenLineage.RunEvent runEvent = OpenLineageClientUtils.runEventFromJson(olEvent); + DatahubJob datahubJob = OpenLineageToDataHub.convertRunEventToJob(runEvent, builder.build()); + + assertNotNull(datahubJob); + + for (DatahubDataset dataset : datahubJob.getInSet()) { + assertEquals( + "urn:li:dataset:(urn:li:dataPlatform:mysql,datahub.metadata_aspect_v2,DEV)", + dataset.getUrn().toString()); + } + for (DatahubDataset dataset : datahubJob.getOutSet()) { + assertEquals( + "urn:li:dataset:(urn:li:dataPlatform:redshift,my-platform-instance.dev.public.spark_redshift_load_test,DEV)", + dataset.getUrn().toString()); + assertEquals( + dataset.getSchemaMetadata().getPlatform().toString(), "urn:li:dataPlatform:redshift"); + } + } + + public void testProcessRedshiftOutputWithPlatformSpecificEnv() + throws URISyntaxException, IOException { + DatahubOpenlineageConfig.DatahubOpenlineageConfigBuilder builder = + DatahubOpenlineageConfig.builder(); + builder.fabricType(FabricType.DEV); + builder.hivePlatformAlias("glue"); + builder.materializeDataset(true); + builder.includeSchemaMetadata(true); + builder.pathSpecs( + new HashMap>() { + { + put( + "redshift", + List.of(PathSpec.builder().platform("redshift").env(Optional.of("PROD")).build())); + } + }); + + String olEvent = + IOUtils.toString( + this.getClass().getResourceAsStream("/ol_events/redshift_lineage_spark.json"), + StandardCharsets.UTF_8); + + OpenLineage.RunEvent runEvent = OpenLineageClientUtils.runEventFromJson(olEvent); + DatahubJob datahubJob = OpenLineageToDataHub.convertRunEventToJob(runEvent, builder.build()); + + assertNotNull(datahubJob); + + for (DatahubDataset dataset : datahubJob.getInSet()) { + assertEquals( + "urn:li:dataset:(urn:li:dataPlatform:mysql,datahub.metadata_aspect_v2,DEV)", + dataset.getUrn().toString()); + } + for (DatahubDataset dataset : datahubJob.getOutSet()) { + assertEquals( + "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.spark_redshift_load_test,PROD)", + dataset.getUrn().toString()); + assertEquals( + dataset.getSchemaMetadata().getPlatform().toString(), "urn:li:dataPlatform:redshift"); + } + } + + public void testProcessRedshiftOutputLowercasedUrns() throws URISyntaxException, IOException { + DatahubOpenlineageConfig.DatahubOpenlineageConfigBuilder builder = + DatahubOpenlineageConfig.builder(); + builder.fabricType(FabricType.DEV); + builder.hivePlatformAlias("glue"); + builder.materializeDataset(true); + builder.includeSchemaMetadata(true); + builder.lowerCaseDatasetUrns(true); + + String olEvent = + IOUtils.toString( + this.getClass() + .getResourceAsStream("/ol_events/redshift_mixed_case_lineage_spark.json"), + StandardCharsets.UTF_8); + + OpenLineage.RunEvent runEvent = OpenLineageClientUtils.runEventFromJson(olEvent); + DatahubJob datahubJob = OpenLineageToDataHub.convertRunEventToJob(runEvent, builder.build()); + + assertNotNull(datahubJob); + + for (DatahubDataset dataset : datahubJob.getInSet()) { + assertEquals( + "urn:li:dataset:(urn:li:dataPlatform:mysql,datahub.metadata_aspect_v2,DEV)", + dataset.getUrn().toString()); + } + for (DatahubDataset dataset : datahubJob.getOutSet()) { + assertEquals( + "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.spark_redshift_load_test,DEV)", + dataset.getUrn().toString()); + assertEquals( + dataset.getSchemaMetadata().getPlatform().toString(), "urn:li:dataPlatform:redshift"); + } + } } diff --git a/metadata-integration/java/spark-lineage-beta/src/test/resources/ol_events/redshift_mixed_case_lineage_spark.json b/metadata-integration/java/spark-lineage-beta/src/test/resources/ol_events/redshift_mixed_case_lineage_spark.json new file mode 100644 index 00000000000000..692eebe64bb5cb --- /dev/null +++ b/metadata-integration/java/spark-lineage-beta/src/test/resources/ol_events/redshift_mixed_case_lineage_spark.json @@ -0,0 +1,147 @@ +{ + "eventTime": "2024-06-18T06:52:21.64Z", + "producer": "https://github.com/OpenLineage/OpenLineage/tree/1.16.0/integration/spark", + "schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent", + "eventType": "COMPLETE", + "run": { + "runId": "01902a1e-371a-7dbf-8098-2337d441e8dc", + "facets": { + "parent": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.16.0/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/ParentRunFacet.json#/$defs/ParentRunFacet", + "run": { + "runId": "01902a1e-0b05-750e-b38d-439998f7a853" + }, + "job": { + "namespace": "default", + "name": "jdbc_test_demo" + } + }, + "processing_engine": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.16.0/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/1-1-1/ProcessingEngineRunFacet.json#/$defs/ProcessingEngineRunFacet", + "version": "3.3.4", + "name": "spark" + }, + "environment-properties": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.16.0/integration/spark", + "_schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet", + "environment-properties": {} + }, + "spark_properties": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.16.0/integration/spark", + "_schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet", + "properties": { + "spark.master": "local[*]", + "spark.app.name": "JdbcTest-Demo" + } + } + } + }, + "job": { + "namespace": "default", + "name": "jdbc_test_demo.execute_save_into_data_source_command.spark_redshift_load_test", + "facets": { + "jobType": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.16.0/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/2-0-2/JobTypeJobFacet.json#/$defs/JobTypeJobFacet", + "processingType": "BATCH", + "integration": "SPARK", + "jobType": "SQL_JOB" + } + } + }, + "inputs": [ + { + "namespace": "mysql://localhost:3306", + "name": "DataHub.Metadata_Aspect_V2", + "facets": { + "dataSource": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.16.0/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/DatasourceDatasetFacet.json#/$defs/DatasourceDatasetFacet", + "name": "mysql://localhost:3306", + "uri": "mysql://localhost:3306" + }, + "schema": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.16.0/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/1-1-1/SchemaDatasetFacet.json#/$defs/SchemaDatasetFacet", + "fields": [ + { + "name": "urn", + "type": "string" + }, + { + "name": "aspect", + "type": "string" + }, + { + "name": "version", + "type": "long" + }, + { + "name": "metadata", + "type": "string" + }, + { + "name": "systemmetadata", + "type": "string" + }, + { + "name": "createdon", + "type": "timestamp" + }, + { + "name": "createdby", + "type": "string" + }, + { + "name": "createdfor", + "type": "string" + } + ] + } + }, + "inputFacets": {} + } + ], + "outputs": [ + { + "namespace": "redshift://my-redshift-cluster.us-west-2.redshift.amazonaws.com:5439", + "name": "DEV.PuBliC.SparK_RedshifT_Load_Test", + "facets": { + "dataSource": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.16.0/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/DatasourceDatasetFacet.json#/$defs/DatasourceDatasetFacet", + "name": "redshift://my-redshift-cluster.us-west-2.redshift.amazonaws.com:5439", + "uri": "redshift://my-redshift-cluster.us-west-2.redshift.amazonaws.com:5439" + }, + "schema": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.16.0/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/1-1-1/SchemaDatasetFacet.json#/$defs/SchemaDatasetFacet", + "fields": [ + { + "name": "urn", + "type": "string" + } + ] + }, + "columnLineage": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.16.0/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-2/ColumnLineageDatasetFacet.json#/$defs/ColumnLineageDatasetFacet", + "fields": { + "urn": { + "inputFields": [ + { + "namespace": "mysql://localhost:3306", + "name": "datahub.metadata_aspect_v2", + "field": "urn" + } + ] + } + } + } + }, + "outputFacets": {} + } + ] +} \ No newline at end of file