Skip to content

Commit

Permalink
Merge branch 'master' into snowflake-streams-v2
Browse files Browse the repository at this point in the history
  • Loading branch information
brock-acryl authored Jan 15, 2025
2 parents e0b3c3b + b015fd2 commit f0124e2
Show file tree
Hide file tree
Showing 129 changed files with 10,944 additions and 477 deletions.
6 changes: 2 additions & 4 deletions .github/workflows/metadata-model.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,9 @@ jobs:
steps:
- name: Check whether upload to datahub is enabled
id: publish
env:
ENABLE_PUBLISH: ${{ secrets.DataHubToken }}
run: |
echo "Enable publish: ${{ env.ENABLE_PUBLISH != '' }}"
echo "publish=${{ env.ENABLE_PUBLISH != '' }}" >> $GITHUB_OUTPUT
echo "Enable publish: ${{ github.repository == 'datahub-project/datahub' }}"
echo "publish=${{ github.repository == 'datahub-project/datahub' }}" >> $GITHUB_OUTPUT
metadata-ingestion-docgen:
runs-on: ubuntu-latest
needs: setup
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ project.ext.externalDependency = [
'kafkaAvroSerde': "io.confluent:kafka-streams-avro-serde:$kafkaVersion",
'kafkaAvroSerializer': 'io.confluent:kafka-avro-serializer:5.1.4',
'kafkaClients': "org.apache.kafka:kafka-clients:$kafkaVersion-ccs",
'snappy': 'org.xerial.snappy:snappy-java:1.1.10.4',
'snappy': 'org.xerial.snappy:snappy-java:1.1.10.5',
'logbackClassic': "ch.qos.logback:logback-classic:$logbackClassic",
'logbackClassicJava8' : "ch.qos.logback:logback-classic:$logbackClassicJava8",
'slf4jApi': "org.slf4j:slf4j-api:$slf4jVersion",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import com.linkedin.datahub.graphql.generated.DataJobInputOutput;
import com.linkedin.datahub.graphql.generated.DataPlatform;
import com.linkedin.datahub.graphql.generated.DataPlatformInstance;
import com.linkedin.datahub.graphql.generated.DataProcessInstance;
import com.linkedin.datahub.graphql.generated.DataQualityContract;
import com.linkedin.datahub.graphql.generated.Dataset;
import com.linkedin.datahub.graphql.generated.DatasetStatsSummary;
Expand Down Expand Up @@ -173,6 +174,8 @@
import com.linkedin.datahub.graphql.resolvers.embed.UpdateEmbedResolver;
import com.linkedin.datahub.graphql.resolvers.entity.EntityExistsResolver;
import com.linkedin.datahub.graphql.resolvers.entity.EntityPrivilegesResolver;
import com.linkedin.datahub.graphql.resolvers.entity.versioning.LinkAssetVersionResolver;
import com.linkedin.datahub.graphql.resolvers.entity.versioning.UnlinkAssetVersionResolver;
import com.linkedin.datahub.graphql.resolvers.form.BatchAssignFormResolver;
import com.linkedin.datahub.graphql.resolvers.form.BatchRemoveFormResolver;
import com.linkedin.datahub.graphql.resolvers.form.CreateDynamicFormAssignmentResolver;
Expand Down Expand Up @@ -346,6 +349,7 @@
import com.linkedin.datahub.graphql.types.datajob.DataJobType;
import com.linkedin.datahub.graphql.types.dataplatform.DataPlatformType;
import com.linkedin.datahub.graphql.types.dataplatforminstance.DataPlatformInstanceType;
import com.linkedin.datahub.graphql.types.dataprocessinst.DataProcessInstanceType;
import com.linkedin.datahub.graphql.types.dataprocessinst.mappers.DataProcessInstanceRunEventMapper;
import com.linkedin.datahub.graphql.types.dataproduct.DataProductType;
import com.linkedin.datahub.graphql.types.dataset.DatasetType;
Expand Down Expand Up @@ -389,6 +393,7 @@
import com.linkedin.metadata.config.telemetry.TelemetryConfiguration;
import com.linkedin.metadata.connection.ConnectionService;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.versioning.EntityVersioningService;
import com.linkedin.metadata.graph.GraphClient;
import com.linkedin.metadata.graph.SiblingGraphService;
import com.linkedin.metadata.models.registry.EntityRegistry;
Expand Down Expand Up @@ -474,6 +479,7 @@ public class GmsGraphQLEngine {
private final RestrictedService restrictedService;
private ConnectionService connectionService;
private AssertionService assertionService;
private final EntityVersioningService entityVersioningService;

private final BusinessAttributeService businessAttributeService;
private final FeatureFlags featureFlags;
Expand Down Expand Up @@ -530,6 +536,7 @@ public class GmsGraphQLEngine {
private final FormType formType;
private final IncidentType incidentType;
private final RestrictedType restrictedType;
private final DataProcessInstanceType dataProcessInstanceType;

private final int graphQLQueryComplexityLimit;
private final int graphQLQueryDepthLimit;
Expand Down Expand Up @@ -596,6 +603,7 @@ public GmsGraphQLEngine(final GmsGraphQLEngineArgs args) {
this.restrictedService = args.restrictedService;
this.connectionService = args.connectionService;
this.assertionService = args.assertionService;
this.entityVersioningService = args.entityVersioningService;

this.businessAttributeService = args.businessAttributeService;
this.ingestionConfiguration = Objects.requireNonNull(args.ingestionConfiguration);
Expand Down Expand Up @@ -649,6 +657,7 @@ public GmsGraphQLEngine(final GmsGraphQLEngineArgs args) {
this.formType = new FormType(entityClient);
this.incidentType = new IncidentType(entityClient);
this.restrictedType = new RestrictedType(entityClient, restrictedService);
this.dataProcessInstanceType = new DataProcessInstanceType(entityClient, featureFlags);

this.graphQLQueryComplexityLimit = args.graphQLQueryComplexityLimit;
this.graphQLQueryDepthLimit = args.graphQLQueryDepthLimit;
Expand Down Expand Up @@ -699,7 +708,8 @@ public GmsGraphQLEngine(final GmsGraphQLEngineArgs args) {
formType,
incidentType,
restrictedType,
businessAttributeType));
businessAttributeType,
dataProcessInstanceType));
this.loadableTypes = new ArrayList<>(entityTypes);
// Extend loadable types with types from the plugins
// This allows us to offer search and browse capabilities out of the box for
Expand Down Expand Up @@ -1024,6 +1034,7 @@ private void configureQueryResolvers(final RuntimeWiring.Builder builder) {
.dataFetcher("tag", getResolver(tagType))
.dataFetcher("dataFlow", getResolver(dataFlowType))
.dataFetcher("dataJob", getResolver(dataJobType))
.dataFetcher("dataProcessInstance", getResolver(dataProcessInstanceType))
.dataFetcher("glossaryTerm", getResolver(glossaryTermType))
.dataFetcher("glossaryNode", getResolver(glossaryNodeType))
.dataFetcher("domain", getResolver((domainType)))
Expand Down Expand Up @@ -1386,6 +1397,16 @@ private void configureMutationResolvers(final RuntimeWiring.Builder builder) {
"removeBusinessAttribute",
new RemoveBusinessAttributeResolver(this.entityService));
}
if (featureFlags.isEntityVersioning()) {
typeWiring
.dataFetcher(
"linkAssetVersion",
new LinkAssetVersionResolver(this.entityVersioningService, this.featureFlags))
.dataFetcher(
"unlinkAssetVersion",
new UnlinkAssetVersionResolver(
this.entityVersioningService, this.featureFlags));
}
return typeWiring;
});
}
Expand Down Expand Up @@ -3058,6 +3079,35 @@ private void configureDataProcessInstanceResolvers(final RuntimeWiring.Builder b
"DataProcessInstance",
typeWiring ->
typeWiring
.dataFetcher(
"dataPlatformInstance",
new LoadableTypeResolver<>(
dataPlatformInstanceType,
(env) -> {
final DataProcessInstance dataProcessInstance = env.getSource();
return dataProcessInstance.getDataPlatformInstance() != null
? dataProcessInstance.getDataPlatformInstance().getUrn()
: null;
}))
.dataFetcher(
"platform",
new LoadableTypeResolver<>(
dataPlatformType,
(env) -> {
final DataProcessInstance dataProcessInstance = env.getSource();
return dataProcessInstance.getPlatform() != null
? dataProcessInstance.getPlatform().getUrn()
: null;
}))
.dataFetcher("parentContainers", new ParentContainersResolver(entityClient))
.dataFetcher(
"container",
new LoadableTypeResolver<>(
containerType,
(env) -> {
final DataProcessInstance dpi = env.getSource();
return dpi.getContainer() != null ? dpi.getContainer().getUrn() : null;
}))
.dataFetcher("relationships", new EntityRelationshipsResultResolver(graphClient))
.dataFetcher(
"lineage",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.linkedin.metadata.config.telemetry.TelemetryConfiguration;
import com.linkedin.metadata.connection.ConnectionService;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.versioning.EntityVersioningService;
import com.linkedin.metadata.graph.GraphClient;
import com.linkedin.metadata.graph.SiblingGraphService;
import com.linkedin.metadata.models.registry.EntityRegistry;
Expand Down Expand Up @@ -88,6 +89,7 @@ public class GmsGraphQLEngineArgs {
BusinessAttributeService businessAttributeService;
ConnectionService connectionService;
AssertionService assertionService;
EntityVersioningService entityVersioningService;

// any fork specific args should go below this line
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package com.linkedin.datahub.graphql.resolvers.entity.versioning;

import static com.linkedin.datahub.graphql.resolvers.ResolverUtils.bindArgument;
import static com.linkedin.metadata.Constants.VERSION_SET_ENTITY_NAME;
import static com.linkedin.metadata.authorization.ApiOperation.UPDATE;

import com.datahub.authorization.AuthUtil;
import com.google.common.collect.ImmutableSet;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.concurrency.GraphQLConcurrencyUtils;
import com.linkedin.datahub.graphql.exception.AuthorizationException;
import com.linkedin.datahub.graphql.featureflags.FeatureFlags;
import com.linkedin.datahub.graphql.generated.LinkVersionInput;
import com.linkedin.metadata.entity.IngestResult;
import com.linkedin.metadata.entity.versioning.EntityVersioningService;
import com.linkedin.metadata.entity.versioning.VersionPropertiesInput;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import io.datahubproject.metadata.context.OperationContext;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang.StringUtils;

/**
* Currently only supports linking the latest version, but may be modified later to support inserts
*/
public class LinkAssetVersionResolver implements DataFetcher<CompletableFuture<String>> {

private final EntityVersioningService entityVersioningService;
private final FeatureFlags featureFlags;

public LinkAssetVersionResolver(
EntityVersioningService entityVersioningService, FeatureFlags featureFlags) {
this.entityVersioningService = entityVersioningService;
this.featureFlags = featureFlags;
}

@Override
public CompletableFuture<String> get(DataFetchingEnvironment environment) throws Exception {
final QueryContext context = environment.getContext();
final LinkVersionInput input =
bindArgument(environment.getArgument("input"), LinkVersionInput.class);
if (!featureFlags.isEntityVersioning()) {
throw new IllegalAccessError(
"Entity Versioning is not configured, please enable before attempting to use this feature.");
}
Urn versionSetUrn = UrnUtils.getUrn(input.getVersionSet());
if (!VERSION_SET_ENTITY_NAME.equals(versionSetUrn.getEntityType())) {
throw new IllegalArgumentException(
String.format("Version Set urn %s must be of type Version Set.", input.getVersionSet()));
}
Urn entityUrn = UrnUtils.getUrn(input.getLinkedEntity());
OperationContext opContext = context.getOperationContext();
if (!AuthUtil.isAPIAuthorizedEntityUrns(
opContext, UPDATE, ImmutableSet.of(versionSetUrn, entityUrn))) {
throw new AuthorizationException(
String.format(
"%s is unauthorized to %s entities %s and %s",
opContext.getAuthentication().getActor().toUrnStr(),
UPDATE,
input.getVersionSet(),
input.getLinkedEntity()));
}
VersionPropertiesInput versionPropertiesInput =
new VersionPropertiesInput(
input.getComment(),
input.getVersion(),
input.getSourceTimestamp(),
input.getSourceCreator());
return GraphQLConcurrencyUtils.supplyAsync(
() -> {
List<IngestResult> linkResults =
entityVersioningService.linkLatestVersion(
opContext, versionSetUrn, entityUrn, versionPropertiesInput);

return linkResults.stream()
.filter(
ingestResult -> input.getLinkedEntity().equals(ingestResult.getUrn().toString()))
.map(ingestResult -> ingestResult.getUrn().toString())
.findAny()
.orElse(StringUtils.EMPTY);
},
this.getClass().getSimpleName(),
"get");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package com.linkedin.datahub.graphql.resolvers.entity.versioning;

import static com.linkedin.datahub.graphql.resolvers.ResolverUtils.bindArgument;
import static com.linkedin.metadata.Constants.VERSION_SET_ENTITY_NAME;
import static com.linkedin.metadata.authorization.ApiOperation.UPDATE;

import com.datahub.authorization.AuthUtil;
import com.google.common.collect.ImmutableSet;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.concurrency.GraphQLConcurrencyUtils;
import com.linkedin.datahub.graphql.exception.AuthorizationException;
import com.linkedin.datahub.graphql.featureflags.FeatureFlags;
import com.linkedin.datahub.graphql.generated.UnlinkVersionInput;
import com.linkedin.metadata.entity.versioning.EntityVersioningService;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import io.datahubproject.metadata.context.OperationContext;
import java.util.concurrent.CompletableFuture;

public class UnlinkAssetVersionResolver implements DataFetcher<CompletableFuture<Boolean>> {

private final EntityVersioningService entityVersioningService;
private final FeatureFlags featureFlags;

public UnlinkAssetVersionResolver(
EntityVersioningService entityVersioningService, FeatureFlags featureFlags) {
this.entityVersioningService = entityVersioningService;
this.featureFlags = featureFlags;
}

@Override
public CompletableFuture<Boolean> get(DataFetchingEnvironment environment) throws Exception {
if (!featureFlags.isEntityVersioning()) {
throw new IllegalAccessError(
"Entity Versioning is not configured, please enable before attempting to use this feature.");
}
final QueryContext context = environment.getContext();
final UnlinkVersionInput input =
bindArgument(environment.getArgument("input"), UnlinkVersionInput.class);
Urn versionSetUrn = UrnUtils.getUrn(input.getVersionSet());
if (!VERSION_SET_ENTITY_NAME.equals(versionSetUrn.getEntityType())) {
throw new IllegalArgumentException(
String.format("Version Set urn %s must be of type Version Set.", input.getVersionSet()));
}
Urn entityUrn = UrnUtils.getUrn(input.getUnlinkedEntity());
OperationContext opContext = context.getOperationContext();
if (!AuthUtil.isAPIAuthorizedEntityUrns(
opContext, UPDATE, ImmutableSet.of(versionSetUrn, entityUrn))) {
throw new AuthorizationException(
String.format(
"%s is unauthorized to %s entities %s and %s",
opContext.getAuthentication().getActor(),
UPDATE,
input.getVersionSet(),
input.getUnlinkedEntity()));
}
return GraphQLConcurrencyUtils.supplyAsync(
() -> {
entityVersioningService.unlinkVersion(opContext, versionSetUrn, entityUrn);
return true;
},
this.getClass().getSimpleName(),
"get");
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.linkedin.datahub.graphql.types.common.mappers;

import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.generated.DataPlatform;
import com.linkedin.datahub.graphql.generated.DataPlatformInstance;
import com.linkedin.datahub.graphql.generated.EntityType;
import com.linkedin.datahub.graphql.types.mappers.ModelMapper;
Expand Down Expand Up @@ -28,6 +29,11 @@ public DataPlatformInstance apply(
result.setType(EntityType.DATA_PLATFORM_INSTANCE);
result.setUrn(input.getInstance().toString());
}
result.setPlatform(
DataPlatform.builder()
.setUrn(input.getPlatform().toString())
.setType(EntityType.DATA_PLATFORM)
.build());
return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.linkedin.datahub.graphql.types.common.mappers;

import com.linkedin.common.TimeStamp;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.generated.AuditStamp;
import javax.annotation.Nullable;

public class TimeStampToAuditStampMapper {

public static final TimeStampToAuditStampMapper INSTANCE = new TimeStampToAuditStampMapper();

public static AuditStamp map(
@Nullable final QueryContext context, @Nullable final TimeStamp input) {
if (input == null) {
return null;
}
final AuditStamp result = new AuditStamp();
result.setTime(input.getTime());
if (input.hasActor()) {
result.setActor(input.getActor().toString());
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.linkedin.datahub.graphql.generated.DataJob;
import com.linkedin.datahub.graphql.generated.DataPlatform;
import com.linkedin.datahub.graphql.generated.DataPlatformInstance;
import com.linkedin.datahub.graphql.generated.DataProcessInstance;
import com.linkedin.datahub.graphql.generated.DataProduct;
import com.linkedin.datahub.graphql.generated.Dataset;
import com.linkedin.datahub.graphql.generated.Domain;
Expand Down Expand Up @@ -225,6 +226,11 @@ public Entity apply(@Nullable QueryContext context, Urn input) {
((BusinessAttribute) partialEntity).setUrn(input.toString());
((BusinessAttribute) partialEntity).setType(EntityType.BUSINESS_ATTRIBUTE);
}
if (input.getEntityType().equals(DATA_PROCESS_INSTANCE_ENTITY_NAME)) {
partialEntity = new DataProcessInstance();
((DataProcessInstance) partialEntity).setUrn(input.toString());
((DataProcessInstance) partialEntity).setType(EntityType.DATA_PROCESS_INSTANCE);
}
return partialEntity;
}
}
Loading

0 comments on commit f0124e2

Please sign in to comment.