Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(spark): Finegrained lineage is emitted on the DataJob and not on the emitted Datasets. #11956

Merged
merged 15 commits into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 58 additions & 43 deletions metadata-integration/java/acryl-spark-lineage/README.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
package datahub.spark;

import static com.linkedin.metadata.Constants.*;
import static datahub.spark.converter.SparkStreamingEventToDatahub.*;
import static io.datahubproject.openlineage.converter.OpenLineageToDataHub.*;
import static io.datahubproject.openlineage.utils.DatahubUtils.*;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.StreamReadConstraints;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.linkedin.common.GlobalTags;
import com.linkedin.common.UrnArray;
import com.linkedin.common.urn.DataJobUrn;
import com.linkedin.data.DataMap;
import com.linkedin.data.template.JacksonDataTemplateCodec;
import com.linkedin.data.template.StringMap;
import com.linkedin.dataprocess.DataProcessInstanceRelationships;
import com.linkedin.dataprocess.RunResultType;
Expand Down Expand Up @@ -62,12 +68,23 @@ public class DatahubEventEmitter extends EventEmitter {
private final Map<String, MetadataChangeProposalWrapper> schemaMap = new HashMap<>();
private SparkLineageConf datahubConf;
private static final int DEFAULT_TIMEOUT_SEC = 10;
private final ObjectMapper objectMapper;
private final JacksonDataTemplateCodec dataTemplateCodec;

private final EventFormatter eventFormatter = new EventFormatter();

public DatahubEventEmitter(SparkOpenLineageConfig config, String applicationJobName)
throws URISyntaxException {
super(config, applicationJobName);
objectMapper = new ObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL);
int maxSize =
Integer.parseInt(
System.getenv()
.getOrDefault(INGESTION_MAX_SERIALIZED_STRING_LENGTH, MAX_JACKSON_STRING_SIZE));
objectMapper
.getFactory()
.setStreamReadConstraints(StreamReadConstraints.builder().maxStringLength(maxSize).build());
dataTemplateCodec = new JacksonDataTemplateCodec(objectMapper.getFactory());
}

private Optional<Emitter> getEmitter() {
Expand Down Expand Up @@ -407,7 +424,14 @@ protected void emitMcps(List<MetadataChangeProposal> mcps) {
.map(
mcp -> {
try {
log.info("emitting mcpw: " + mcp);
if (this.datahubConf.isLogMcps()) {
DataMap map = mcp.data();
String serializedMCP = dataTemplateCodec.mapToString(map);
log.info("emitting mcpw: {}", serializedMCP);
} else {
log.info(
"emitting aspect: {} for urn: {}", mcp.getAspectName(), mcp.getEntityUrn());
}
return emitter.get().emit(mcp);
} catch (IOException ioException) {
log.error("Failed to emit metadata to DataHub", ioException);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public class SparkConfigParser {
public static final String FILE_EMITTER_FILE_NAME = "file.filename";
public static final String DISABLE_SSL_VERIFICATION_KEY = "rest.disable_ssl_verification";
public static final String REST_DISABLE_CHUNKED_ENCODING = "rest.disable_chunked_encoding";
public static final String CONFIG_LOG_MCPS = "log.mcps";

public static final String MAX_RETRIES = "rest.max_retries";
public static final String RETRY_INTERVAL_IN_SEC = "rest.retry_interval_in_sec";
Expand All @@ -51,6 +52,7 @@ public class SparkConfigParser {

public static final String COALESCE_KEY = "coalesce_jobs";
public static final String PATCH_ENABLED = "patch.enabled";
public static final String LEGACY_LINEAGE_CLEANUP = "legacyLineageCleanup.enabled";
public static final String DISABLE_SYMLINK_RESOLUTION = "disableSymlinkResolution";

public static final String STAGE_METADATA_COALESCING = "stage_metadata_coalescing";
Expand Down Expand Up @@ -158,6 +160,7 @@ public static DatahubOpenlineageConfig sparkConfigToDatahubOpenlineageConf(
Config sparkConfig, SparkAppContext sparkAppContext) {
DatahubOpenlineageConfig.DatahubOpenlineageConfigBuilder builder =
DatahubOpenlineageConfig.builder();
builder.isSpark(true);
builder.filePartitionRegexpPattern(
SparkConfigParser.getFilePartitionRegexpPattern(sparkConfig));
builder.fabricType(SparkConfigParser.getCommonFabricType(sparkConfig));
Expand All @@ -172,6 +175,7 @@ public static DatahubOpenlineageConfig sparkConfigToDatahubOpenlineageConf(
builder.commonDatasetPlatformInstance(SparkConfigParser.getCommonPlatformInstance(sparkConfig));
builder.hivePlatformAlias(SparkConfigParser.getHivePlatformAlias(sparkConfig));
builder.usePatch(SparkConfigParser.isPatchEnabled(sparkConfig));
builder.removeLegacyLineage(SparkConfigParser.isLegacyLineageCleanupEnabled(sparkConfig));
builder.disableSymlinkResolution(SparkConfigParser.isDisableSymlinkResolution(sparkConfig));
builder.lowerCaseDatasetUrns(SparkConfigParser.isLowerCaseDatasetUrns(sparkConfig));
try {
Expand Down Expand Up @@ -311,6 +315,13 @@ public static boolean isDatasetMaterialize(Config datahubConfig) {
&& datahubConfig.getBoolean(DATASET_MATERIALIZE_KEY);
}

public static boolean isLogMcps(Config datahubConfig) {
if (datahubConfig.hasPath(CONFIG_LOG_MCPS)) {
return datahubConfig.getBoolean(CONFIG_LOG_MCPS);
}
return true;
}

public static boolean isIncludeSchemaMetadata(Config datahubConfig) {
if (datahubConfig.hasPath(DATASET_INCLUDE_SCHEMA_METADATA)) {
return datahubConfig.getBoolean(DATASET_INCLUDE_SCHEMA_METADATA);
Expand Down Expand Up @@ -352,6 +363,14 @@ public static boolean isPatchEnabled(Config datahubConfig) {
return datahubConfig.hasPath(PATCH_ENABLED) && datahubConfig.getBoolean(PATCH_ENABLED);
}

public static boolean isLegacyLineageCleanupEnabled(Config datahubConfig) {
if (!datahubConfig.hasPath(LEGACY_LINEAGE_CLEANUP)) {
return false;
}
return datahubConfig.hasPath(LEGACY_LINEAGE_CLEANUP)
&& datahubConfig.getBoolean(LEGACY_LINEAGE_CLEANUP);
}

public static boolean isDisableSymlinkResolution(Config datahubConfig) {
if (!datahubConfig.hasPath(DISABLE_SYMLINK_RESOLUTION)) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public class SparkLineageConf {
final DatahubOpenlineageConfig openLineageConf;
@Builder.Default final boolean coalesceEnabled = true;
@Builder.Default final boolean emitCoalescePeriodically = false;
@Builder.Default final boolean logMcps = true;
final SparkAppContext sparkAppContext;
final DatahubEmitterConfig datahubEmitterConfig;
@Builder.Default final List<String> tags = new LinkedList<>();
Expand All @@ -32,6 +33,7 @@ public static SparkLineageConf toSparkLineageConf(
SparkConfigParser.sparkConfigToDatahubOpenlineageConf(sparkConfig, sparkAppContext);
builder.openLineageConf(datahubOpenlineageConfig);
builder.coalesceEnabled(SparkConfigParser.isCoalesceEnabled(sparkConfig));
builder.logMcps(SparkConfigParser.isLogMcps(sparkConfig));
if (SparkConfigParser.getTags(sparkConfig) != null) {
builder.tags(Arrays.asList(Objects.requireNonNull(SparkConfigParser.getTags(sparkConfig))));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -814,4 +814,32 @@ public void testProcessGCSInputsOutputs() throws URISyntaxException, IOException
dataset.getUrn().toString());
}
}

public void testProcessMappartitionJob() throws URISyntaxException, IOException {
DatahubOpenlineageConfig.DatahubOpenlineageConfigBuilder builder =
DatahubOpenlineageConfig.builder();
builder.fabricType(FabricType.DEV);
builder.lowerCaseDatasetUrns(true);
builder.materializeDataset(true);
builder.includeSchemaMetadata(true);
builder.isSpark(true);

String olEvent =
IOUtils.toString(
this.getClass().getResourceAsStream("/ol_events/map_partition_job.json"),
StandardCharsets.UTF_8);

OpenLineage.RunEvent runEvent = OpenLineageClientUtils.runEventFromJson(olEvent);
DatahubJob datahubJob = OpenLineageToDataHub.convertRunEventToJob(runEvent, builder.build());

assertNotNull(datahubJob);

assertEquals(1, datahubJob.getInSet().size());
for (DatahubDataset dataset : datahubJob.getInSet()) {
assertEquals(
"urn:li:dataset:(urn:li:dataPlatform:s3,my-bucket/my_dir/my_file.csv,DEV)",
dataset.getUrn().toString());
}
assertEquals(0, datahubJob.getOutSet().size());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
{
"eventTime": "2024-11-20T12:59:29.059Z",
"producer": "https://github.com/OpenLineage/OpenLineage/tree/1.24.2/integration/spark",
"schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent",
"eventType": "START",
"run": {
"runId": "01902a1e-0b05-750e-b38d-439998f7a853",
"facets": {
"parent": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.24.2/integration/spark",
"_schemaURL": "https://openlineage.io/spec/facets/1-0-1/ParentRunFacet.json#/$defs/ParentRunFacet",
"run": {
"runId": "01902a1e-0b05-750e-b38d-439998f7a853"
},
"job": {
"namespace": "default",
"name": "spark_context_session"
}
},
"processing_engine": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.24.2/integration/spark",
"_schemaURL": "https://openlineage.io/spec/facets/1-1-1/ProcessingEngineRunFacet.json#/$defs/ProcessingEngineRunFacet",
"version": "3.4.2",
"name": "spark"
},
"spark_jobDetails": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.24.2/integration/spark",
"_schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet",
"jobId": 0
},
"spark_properties": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.24.2/integration/spark",
"_schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet",
"properties": {
"spark.master": "yarn",
"spark.app.name": "SparkContextSession"
}
}
}
},
"job": {
"namespace": "default",
"name": "spark_context_session.map_partitions_parallel_collection",
"facets": {
"jobType": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.24.2/integration/spark",
"_schemaURL": "https://openlineage.io/spec/facets/2-0-3/JobTypeJobFacet.json#/$defs/JobTypeJobFacet",
"processingType": "BATCH",
"integration": "SPARK",
"jobType": "RDD_JOB"
}
}
},
"inputs": [
{
"namespace": "s3://my-bucket",
"name": "my_dir/my_file.csv"
}
],
"outputs": [
{
"namespace": "s3://my-bucket",
"name": "my_dir/my_file.csv"
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
@Getter
@ToString
public class DatahubOpenlineageConfig {
@Builder.Default private final boolean isSpark = false;
@Builder.Default private final boolean isStreaming = false;
@Builder.Default private final String pipelineName = null;
private final String platformInstance;
Expand All @@ -34,6 +35,7 @@ public class DatahubOpenlineageConfig {
@Builder.Default private Map<String, String> urnAliases = new HashMap<>();
@Builder.Default private final boolean disableSymlinkResolution = false;
@Builder.Default private final boolean lowerCaseDatasetUrns = false;
@Builder.Default private final boolean removeLegacyLineage = false;

public List<PathSpec> getPathSpecsForPlatform(String platform) {
if ((pathSpecs == null) || (pathSpecs.isEmpty())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -675,9 +675,30 @@ private static void convertJobToDataJob(
datahubJob.setJobInfo(dji);
DataJobInputOutput inputOutput = new DataJobInputOutput();

boolean inputsEqualOutputs = false;
if ((datahubConf.isSpark())
&& ((event.getInputs() != null && event.getOutputs() != null)
&& (event.getInputs().size() == event.getOutputs().size()))) {
inputsEqualOutputs =
event.getInputs().stream()
.map(OpenLineage.Dataset::getName)
.collect(Collectors.toSet())
.equals(
event.getOutputs().stream()
.map(OpenLineage.Dataset::getName)
.collect(Collectors.toSet()));
if (inputsEqualOutputs) {
log.info(
"Inputs equals Outputs: {}. This is most probably because of an rdd map operation and we only process Inputs",
inputsEqualOutputs);
}
}

processJobInputs(datahubJob, event, datahubConf);

processJobOutputs(datahubJob, event, datahubConf);
if (!inputsEqualOutputs) {
processJobOutputs(datahubJob, event, datahubConf);
}

DataProcessInstanceRunEvent dpire = processDataProcessInstanceResult(event);
datahubJob.setDataProcessInstanceRunEvent(dpire);
Expand Down
Loading
Loading