Skip to content

Commit

Permalink
[FLINK-36077][Connectors/Google PubSub] Implement table api support f…
Browse files Browse the repository at this point in the history
…or SinkV2
  • Loading branch information
vahmed-hamdy authored Nov 5, 2024
1 parent e3438af commit 303f4db
Show file tree
Hide file tree
Showing 12 changed files with 1,356 additions and 0 deletions.
21 changes: 21 additions & 0 deletions flink-connector-gcp-pubsub/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ under the License.
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-core</artifactId>
Expand All @@ -82,6 +88,21 @@ under the License.
<type>test-jar</type>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-test-utils</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.google.api.gax.rpc.TransportChannelProvider;

import java.io.Serializable;
import java.util.Objects;

/** Configuration keys for {@link com.google.cloud.pubsub.v1.Publisher}. */
@PublicEvolving
Expand Down Expand Up @@ -58,6 +59,50 @@ public TransportChannelProvider getTransportChannelProvider() {
return transportChannelProvider.getTransportChannelProvider();
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

GcpPublisherConfig that = (GcpPublisherConfig) o;

return Objects.equals(retrySettings, that.retrySettings)
&& Objects.equals(batchingSettings, that.batchingSettings)
&& Objects.equals(credentialsProvider, that.credentialsProvider)
&& Objects.equals(transportChannelProvider, that.transportChannelProvider)
&& Objects.equals(enableCompression, that.enableCompression);
}

@Override
public int hashCode() {
return Objects.hash(
retrySettings,
batchingSettings,
credentialsProvider,
transportChannelProvider,
enableCompression);
}

@Override
public String toString() {
return "GcpPublisherConfig{"
+ "retrySettings="
+ retrySettings
+ ", batchingSettings="
+ batchingSettings
+ ", credentialsProvider="
+ credentialsProvider
+ ", transportChannelProvider="
+ transportChannelProvider
+ ", enableCompression="
+ enableCompression
+ '}';
}

public Boolean getEnableCompression() {
return enableCompression;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.google.api.gax.rpc.TransportChannelProvider;

import java.io.Serializable;
import java.util.Objects;

/**
* A serializable transport channel provider for {@link
Expand All @@ -24,4 +25,23 @@ public TransportChannelProvider getTransportChannelProvider() {
}
return transportChannelProvider;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

SerializableTransportChannelProvider that = (SerializableTransportChannelProvider) o;

return Objects.equals(transportChannelProvider, that.transportChannelProvider);
}

@Override
public int hashCode() {
return Objects.hash(transportChannelProvider);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.connector.gcp.pubsub.table;

import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;

import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.auth.oauth2.AccessToken;
import com.google.auth.oauth2.GoogleCredentials;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.time.Instant;
import java.util.Arrays;
import java.util.Date;
import java.util.Map;
import java.util.Optional;

import static com.google.cloud.pubsub.v1.SubscriptionAdminSettings.defaultCredentialsProviderBuilder;
import static org.apache.flink.connector.gcp.pubsub.table.PubSubOptions.CREDENTIALS_FIXED_ACCESS_EXPIRATION_EPOCH_MILLIS;
import static org.apache.flink.connector.gcp.pubsub.table.PubSubOptions.CREDENTIALS_FIXED_ACCESS_TOKEN;
import static org.apache.flink.connector.gcp.pubsub.table.PubSubOptions.CREDENTIALS_FIXED_JSON;

/**
* Factory for creating {@link com.google.api.gax.core.CredentialsProvider} from PubSub Table
* Options.
*/
@Internal
class CredentialProviderFactory {
private static final String PUBSUB_AUTH_SCOPES = "https://www.googleapis.com/auth/pubsub";
private static final String PUBSUB_AUTH_CLOUD_PLATFORM_SCOPES =
"https://www.googleapis.com/auth/cloud-platform";
private static final String CREDENTIALS_PROVIDER_PREFIX = "credentials-provider.";

public CredentialsProvider createCredentialsProvider(
CredentialProviderType credentialType, Map<String, String> options) {
switch (credentialType) {
case DEFAULT:
case GOOGLE_CREDENTIALS:
return defaultCredentialsProviderBuilder().build();
case FIXED:
return FixedCredentialsProvider.create(getFixedCredentialsFromOptions(options));
default:
throw new IllegalArgumentException(
"Unknown credential provider type: " + credentialType);
}
}

private GoogleCredentials getFixedCredentialsFromOptions(Map<String, String> options) {
Preconditions.checkArgument(
options != null
&& (extractCredentialProviderOption(options, CREDENTIALS_FIXED_JSON) != null
|| extractCredentialProviderOption(
options, CREDENTIALS_FIXED_ACCESS_TOKEN)
!= null),
"Fixed credential provider requires '"
+ CREDENTIALS_FIXED_JSON.key()
+ "' or '"
+ CREDENTIALS_FIXED_ACCESS_TOKEN.key()
+ "' options to be set");

Optional<GoogleCredentials> jsonCredsOptional =
Optional.ofNullable(
extractCredentialProviderOption(options, CREDENTIALS_FIXED_JSON))
.map(this::getJsonCredentials);
Optional<GoogleCredentials> accessTokenCredsOptional =
Optional.ofNullable(
extractCredentialProviderOption(
options, CREDENTIALS_FIXED_ACCESS_TOKEN))
.map(
accessToken ->
getAccessTokenCredentials(
accessToken,
extractCredentialProviderOption(
options,
CREDENTIALS_FIXED_ACCESS_EXPIRATION_EPOCH_MILLIS)));

if (!jsonCredsOptional.isPresent() && !accessTokenCredsOptional.isPresent()) {
throw new IllegalArgumentException(
"Fixed credential provider requires '"
+ CREDENTIALS_FIXED_JSON.key()
+ "' or '"
+ CREDENTIALS_FIXED_ACCESS_TOKEN.key()
+ "' options to be set");
}

return jsonCredsOptional.orElseGet(accessTokenCredsOptional::get);
}

private String extractCredentialProviderOption(
Map<String, String> options, ConfigOption<?> option) {
Preconditions.checkNotNull(options);
Preconditions.checkArgument(option.key().startsWith(CREDENTIALS_PROVIDER_PREFIX));
return options.get(option.key().substring(CREDENTIALS_PROVIDER_PREFIX.length()));
}

private GoogleCredentials getJsonCredentials(String credentialsStringJson) {
Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(credentialsStringJson));

try {
return GoogleCredentials.fromStream(
new ByteArrayInputStream(credentialsStringJson.getBytes()));
} catch (IOException e) {
throw new IllegalArgumentException(
"Failed to load Google Credentials from file " + credentialsStringJson, e);
}
}

private GoogleCredentials getAccessTokenCredentials(
String credentialsAccessToken, String credentialsAccessExpirationEpochMillis) {

Preconditions.checkArgument(
!StringUtils.isNullOrWhitespaceOnly(credentialsAccessToken),
"Fixed credential provider requires '"
+ CREDENTIALS_FIXED_JSON.key()
+ "' or '"
+ CREDENTIALS_FIXED_ACCESS_TOKEN.key()
+ "' options to be set");
Long expirationEpochMillis;

try {
expirationEpochMillis =
StringUtils.isNullOrWhitespaceOnly(credentialsAccessExpirationEpochMillis)
? null
: Long.parseLong(credentialsAccessExpirationEpochMillis);
} catch (NumberFormatException e) {
throw new IllegalArgumentException(
"Failed to parse expiration epoch millis: "
+ credentialsAccessExpirationEpochMillis,
e);
}

return GoogleCredentials.create(
getAccessTokenFromOptions(credentialsAccessToken, expirationEpochMillis));
}

private AccessToken getAccessTokenFromOptions(
String credentialsAccessToken, Long credentialsAccessExpirationEpochMillis) {
AccessToken.Builder builder =
AccessToken.newBuilder()
.setTokenValue(credentialsAccessToken)
.setScopes(
Arrays.asList(
PUBSUB_AUTH_CLOUD_PLATFORM_SCOPES, PUBSUB_AUTH_SCOPES));
if (credentialsAccessExpirationEpochMillis != null) {
builder.setExpirationTime(
Date.from(Instant.ofEpochMilli(credentialsAccessExpirationEpochMillis)));
}

return builder.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.connector.gcp.pubsub.table;

import org.apache.flink.annotation.Internal;

/** GCP Credential provider type. */
@Internal
public enum CredentialProviderType {

/** Default Google Credential Providers. */
DEFAULT("default"),

/** {@link com.google.api.gax.core.FixedCredentialsProvider} with the provided credentials. */
FIXED("fixed"),

/** {@link com.google.api.gax.core.GoogleCredentialsProvider} with default credentials. */
GOOGLE_CREDENTIALS("google-credentials");

private final String type;

CredentialProviderType(String type) {
this.type = type;
}

@Override
public String toString() {
return type;
}
}
Loading

0 comments on commit 303f4db

Please sign in to comment.