Skip to content

Commit

Permalink
feat(connections): add connection entity type and graphql endpoints (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
chriscollins3456 authored May 20, 2024
1 parent 187ef12 commit 1240e03
Show file tree
Hide file tree
Showing 23 changed files with 1,008 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ private Constants() {}
public static final String PROPERTIES_SCHEMA_FILE = "properties.graphql";
public static final String FORMS_SCHEMA_FILE = "forms.graphql";
public static final String INCIDENTS_SCHEMA_FILE = "incident.graphql";
public static final String CONNECTIONS_SCHEMA_FILE = "connection.graphql";
public static final String BROWSE_PATH_DELIMITER = "/";
public static final String BROWSE_PATH_V2_DELIMITER = "␟";
public static final String VERSION_STAMP_FIELD_NAME = "versionStamp";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import com.linkedin.datahub.graphql.generated.DashboardStatsSummary;
import com.linkedin.datahub.graphql.generated.DashboardUserUsageCounts;
import com.linkedin.datahub.graphql.generated.DataFlow;
import com.linkedin.datahub.graphql.generated.DataHubConnection;
import com.linkedin.datahub.graphql.generated.DataHubView;
import com.linkedin.datahub.graphql.generated.DataJob;
import com.linkedin.datahub.graphql.generated.DataJobInputOutput;
Expand Down Expand Up @@ -129,6 +130,7 @@
import com.linkedin.datahub.graphql.resolvers.chart.BrowseV2Resolver;
import com.linkedin.datahub.graphql.resolvers.chart.ChartStatsSummaryResolver;
import com.linkedin.datahub.graphql.resolvers.config.AppConfigResolver;
import com.linkedin.datahub.graphql.resolvers.connection.UpsertConnectionResolver;
import com.linkedin.datahub.graphql.resolvers.container.ContainerEntitiesResolver;
import com.linkedin.datahub.graphql.resolvers.container.ParentContainersResolver;
import com.linkedin.datahub.graphql.resolvers.dashboard.DashboardStatsSummaryResolver;
Expand Down Expand Up @@ -306,6 +308,7 @@
import com.linkedin.datahub.graphql.types.chart.ChartType;
import com.linkedin.datahub.graphql.types.common.mappers.OperationMapper;
import com.linkedin.datahub.graphql.types.common.mappers.UrnToEntityMapper;
import com.linkedin.datahub.graphql.types.connection.DataHubConnectionType;
import com.linkedin.datahub.graphql.types.container.ContainerType;
import com.linkedin.datahub.graphql.types.corpgroup.CorpGroupType;
import com.linkedin.datahub.graphql.types.corpuser.CorpUserType;
Expand Down Expand Up @@ -355,6 +358,7 @@
import com.linkedin.metadata.config.ViewsConfiguration;
import com.linkedin.metadata.config.VisualConfiguration;
import com.linkedin.metadata.config.telemetry.TelemetryConfiguration;
import com.linkedin.metadata.connection.ConnectionService;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.graph.GraphClient;
import com.linkedin.metadata.graph.SiblingGraphService;
Expand Down Expand Up @@ -439,6 +443,7 @@ public class GmsGraphQLEngine {
private final ERModelRelationshipService erModelRelationshipService;
private final FormService formService;
private final RestrictedService restrictedService;
private ConnectionService connectionService;

private final BusinessAttributeService businessAttributeService;
private final FeatureFlags featureFlags;
Expand Down Expand Up @@ -472,6 +477,7 @@ public class GmsGraphQLEngine {
private final GlossaryTermType glossaryTermType;
private final GlossaryNodeType glossaryNodeType;
private final AspectType aspectType;
private final DataHubConnectionType connectionType;
private final ContainerType containerType;
private final DomainType domainType;
private final NotebookType notebookType;
Expand Down Expand Up @@ -558,6 +564,7 @@ public GmsGraphQLEngine(final GmsGraphQLEngineArgs args) {
this.dataProductService = args.dataProductService;
this.formService = args.formService;
this.restrictedService = args.restrictedService;
this.connectionService = args.connectionService;

this.businessAttributeService = args.businessAttributeService;
this.ingestionConfiguration = Objects.requireNonNull(args.ingestionConfiguration);
Expand Down Expand Up @@ -588,6 +595,7 @@ public GmsGraphQLEngine(final GmsGraphQLEngineArgs args) {
this.glossaryTermType = new GlossaryTermType(entityClient);
this.glossaryNodeType = new GlossaryNodeType(entityClient);
this.aspectType = new AspectType(entityClient);
this.connectionType = new DataHubConnectionType(entityClient, secretService);
this.containerType = new ContainerType(entityClient);
this.domainType = new DomainType(entityClient);
this.notebookType = new NotebookType(entityClient);
Expand Down Expand Up @@ -636,6 +644,7 @@ public GmsGraphQLEngine(final GmsGraphQLEngineArgs args) {
dataJobType,
glossaryTermType,
glossaryNodeType,
connectionType,
containerType,
notebookType,
domainType,
Expand Down Expand Up @@ -753,6 +762,7 @@ public void configureRuntimeWiring(final RuntimeWiring.Builder builder) {
configureRoleResolvers(builder);
configureBusinessAttributeResolver(builder);
configureBusinessAttributeAssociationResolver(builder);
configureConnectionResolvers(builder);
}

private void configureOrganisationRoleResolvers(RuntimeWiring.Builder builder) {
Expand Down Expand Up @@ -803,6 +813,7 @@ public GraphQLEngine.Builder builder() {
.addSchema(fileBasedSchema(LINEAGE_SCHEMA_FILE))
.addSchema(fileBasedSchema(PROPERTIES_SCHEMA_FILE))
.addSchema(fileBasedSchema(FORMS_SCHEMA_FILE))
.addSchema(fileBasedSchema(CONNECTIONS_SCHEMA_FILE))
.addSchema(fileBasedSchema(INCIDENTS_SCHEMA_FILE));

for (GmsGraphQLPlugin plugin : this.graphQLPlugins) {
Expand Down Expand Up @@ -3015,4 +3026,29 @@ private void configureBusinessAttributeAssociationResolver(final RuntimeWiring.B
.getBusinessAttribute()
.getUrn())));
}

private void configureConnectionResolvers(final RuntimeWiring.Builder builder) {
builder.type(
"Mutation",
typeWiring ->
typeWiring.dataFetcher(
"upsertConnection",
new UpsertConnectionResolver(connectionService, secretService)));
builder.type(
"Query",
typeWiring -> typeWiring.dataFetcher("connection", getResolver(this.connectionType)));
builder.type(
"DataHubConnection",
typeWiring ->
typeWiring.dataFetcher(
"platform",
new LoadableTypeResolver<>(
this.dataPlatformType,
(env) -> {
final DataHubConnection connection = env.getSource();
return connection.getPlatform() != null
? connection.getPlatform().getUrn()
: null;
})));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.linkedin.metadata.config.ViewsConfiguration;
import com.linkedin.metadata.config.VisualConfiguration;
import com.linkedin.metadata.config.telemetry.TelemetryConfiguration;
import com.linkedin.metadata.connection.ConnectionService;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.graph.GraphClient;
import com.linkedin.metadata.graph.SiblingGraphService;
Expand Down Expand Up @@ -84,6 +85,7 @@ public class GmsGraphQLEngineArgs {
int graphQLQueryDepthLimit;
boolean graphQLQueryIntrospectionEnabled;
BusinessAttributeService businessAttributeService;
ConnectionService connectionService;

// any fork specific args should go below this line
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package com.linkedin.datahub.graphql.resolvers.connection;

import com.linkedin.common.DataPlatformInstance;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.DataMap;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.generated.DataHubConnection;
import com.linkedin.datahub.graphql.generated.DataHubConnectionDetails;
import com.linkedin.datahub.graphql.generated.DataHubJsonConnection;
import com.linkedin.datahub.graphql.generated.DataPlatform;
import com.linkedin.datahub.graphql.generated.EntityType;
import com.linkedin.entity.EntityResponse;
import com.linkedin.entity.EnvelopedAspect;
import com.linkedin.entity.EnvelopedAspectMap;
import com.linkedin.metadata.Constants;
import io.datahubproject.metadata.services.SecretService;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

public class ConnectionMapper {
/**
* Maps a GMS encrypted connection details object into the decrypted form returned by the GraphQL
* API.
*
* <p>Returns null if the Entity does not have the required aspects: dataHubConnectionDetails or
* dataPlatformInstance.
*/
@Nullable
public static DataHubConnection map(
@Nonnull final QueryContext context,
@Nonnull final EntityResponse entityResponse,
@Nonnull final SecretService secretService) {
// If the connection does not exist, simply return null
if (!hasAspects(entityResponse)) {
return null;
}

final DataHubConnection result = new DataHubConnection();
final Urn entityUrn = entityResponse.getUrn();
final EnvelopedAspectMap aspects = entityResponse.getAspects();

result.setUrn(entityUrn.toString());
result.setType(EntityType.DATAHUB_CONNECTION);

final EnvelopedAspect envelopedAssertionInfo =
aspects.get(Constants.DATAHUB_CONNECTION_DETAILS_ASPECT_NAME);
if (envelopedAssertionInfo != null) {
result.setDetails(
mapConnectionDetails(
context,
new com.linkedin.connection.DataHubConnectionDetails(
envelopedAssertionInfo.getValue().data()),
secretService));
}
final EnvelopedAspect envelopedPlatformInstance =
aspects.get(Constants.DATA_PLATFORM_INSTANCE_ASPECT_NAME);
if (envelopedPlatformInstance != null) {
final DataMap data = envelopedPlatformInstance.getValue().data();
result.setPlatform(mapPlatform(new DataPlatformInstance(data)));
}
return result;
}

private static DataHubConnectionDetails mapConnectionDetails(
@Nonnull final QueryContext context,
@Nonnull final com.linkedin.connection.DataHubConnectionDetails gmsDetails,
@Nonnull final SecretService secretService) {
final DataHubConnectionDetails result = new DataHubConnectionDetails();
result.setType(
com.linkedin.datahub.graphql.generated.DataHubConnectionDetailsType.valueOf(
gmsDetails.getType().toString()));
if (gmsDetails.hasJson() && ConnectionUtils.canManageConnections(context)) {
result.setJson(mapJsonConnectionDetails(gmsDetails.getJson(), secretService));
}
if (gmsDetails.hasName()) {
result.setName(gmsDetails.getName());
}
return result;
}

private static DataHubJsonConnection mapJsonConnectionDetails(
@Nonnull final com.linkedin.connection.DataHubJsonConnection gmsJsonConnection,
@Nonnull final SecretService secretService) {
final DataHubJsonConnection result = new DataHubJsonConnection();
// Decrypt the BLOB!
result.setBlob(secretService.decrypt(gmsJsonConnection.getEncryptedBlob()));
return result;
}

private static DataPlatform mapPlatform(final DataPlatformInstance platformInstance) {
// Set dummy platform to be resolved.
final DataPlatform partialPlatform = new DataPlatform();
partialPlatform.setUrn(platformInstance.getPlatform().toString());
return partialPlatform;
}

private static boolean hasAspects(@Nonnull final EntityResponse response) {
return response.hasAspects()
&& response.getAspects().containsKey(Constants.DATAHUB_CONNECTION_DETAILS_ASPECT_NAME)
&& response.getAspects().containsKey(Constants.DATA_PLATFORM_INSTANCE_ASPECT_NAME);
}

private ConnectionMapper() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.linkedin.datahub.graphql.resolvers.connection;

import com.datahub.authorization.AuthUtil;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.metadata.authorization.PoliciesConfig;
import javax.annotation.Nonnull;

/** Utilities for working with DataHub Connections. */
public class ConnectionUtils {

/**
* Returns true if the user is able to read and or write connection between DataHub and external
* platforms.
*/
public static boolean canManageConnections(@Nonnull QueryContext context) {
return AuthUtil.isAuthorized(
context.getAuthorizer(),
context.getActorUrn(),
PoliciesConfig.MANAGE_CONNECTIONS_PRIVILEGE);
}

private ConnectionUtils() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package com.linkedin.datahub.graphql.resolvers.connection;

import static com.linkedin.datahub.graphql.resolvers.ResolverUtils.*;

import com.datahub.authentication.Authentication;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.connection.DataHubConnectionDetailsType;
import com.linkedin.connection.DataHubJsonConnection;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.exception.AuthorizationException;
import com.linkedin.datahub.graphql.generated.DataHubConnection;
import com.linkedin.datahub.graphql.generated.UpsertDataHubConnectionInput;
import com.linkedin.entity.EntityResponse;
import com.linkedin.metadata.connection.ConnectionService;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import io.datahubproject.metadata.services.SecretService;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class UpsertConnectionResolver implements DataFetcher<CompletableFuture<DataHubConnection>> {

private final ConnectionService _connectionService;
private final SecretService _secretService;

public UpsertConnectionResolver(
@Nonnull final ConnectionService connectionService,
@Nonnull final SecretService secretService) {
_connectionService =
Objects.requireNonNull(connectionService, "connectionService cannot be null");
_secretService = Objects.requireNonNull(secretService, "secretService cannot be null");
}

@Override
public CompletableFuture<DataHubConnection> get(final DataFetchingEnvironment environment)
throws Exception {

final QueryContext context = environment.getContext();
final UpsertDataHubConnectionInput input =
bindArgument(environment.getArgument("input"), UpsertDataHubConnectionInput.class);
final Authentication authentication = context.getAuthentication();

return CompletableFuture.supplyAsync(
() -> {
if (!ConnectionUtils.canManageConnections(context)) {
throw new AuthorizationException(
"Unauthorized to upsert Connection. Please contact your DataHub administrator for more information.");
}

try {
final Urn connectionUrn =
_connectionService.upsertConnection(
context.getOperationContext(),
input.getId(),
UrnUtils.getUrn(input.getPlatformUrn()),
DataHubConnectionDetailsType.valueOf(input.getType().toString()),
input.getJson() != null
// Encrypt payload
? new DataHubJsonConnection()
.setEncryptedBlob(_secretService.encrypt(input.getJson().getBlob()))
: null,
input.getName());

final EntityResponse connectionResponse =
_connectionService.getConnectionEntityResponse(
context.getOperationContext(), connectionUrn);
return ConnectionMapper.map(context, connectionResponse, _secretService);
} catch (Exception e) {
throw new RuntimeException(
String.format("Failed to upsert a Connection from input %s", input), e);
}
});
}
}
Loading

0 comments on commit 1240e03

Please sign in to comment.