Skip to content
/ beam Public
forked from apache/beam

Commit

Permalink
JCSMP properties providers for new SolaceIO write connector
Browse files Browse the repository at this point in the history
This adds a base class and two providers for the new SolaceIO write
connector. The basic authentication provider just uses a username and
password.

Users can write their own providers to set authentication mechanisms
and other session properties, by extending from the base class. As an
example, we provide a GoogleCloudSecretProvider, to show how to create
your own. This provider can also be used "as is", but it is mainly
meant as an example to implement your own.

In upcoming PRs, I will be submitting the rest of the write
connector. It is thousands of lines of code, so I am splitting in
smaller PRs, to facilitate code reviews.

This PR contributes to apache#31905
  • Loading branch information
iht committed Jul 16, 2024
1 parent b34c014 commit 9929c8d
Show file tree
Hide file tree
Showing 9 changed files with 658 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,7 @@ class BeamModulePlugin implements Plugin<Project> {
def google_ads_version = "26.0.0"
def google_clients_version = "2.0.0"
def google_cloud_bigdataoss_version = "2.2.16"
def google_cloud_secret_manager_version = "2.37.0"
// [bomupgrader] determined by: com.google.cloud:google-cloud-spanner, consistent with: google_cloud_platform_libraries_bom
def google_cloud_spanner_version = "6.66.0"
def google_code_gson_version = "2.10.1"
Expand Down Expand Up @@ -758,6 +759,7 @@ class BeamModulePlugin implements Plugin<Project> {
// [bomupgrader] the BOM version is set by scripts/tools/bomupgrader.py. If update manually, also update
// libraries-bom version on sdks/java/container/license_scripts/dep_urls_java.yaml
google_cloud_platform_libraries_bom : "com.google.cloud:libraries-bom:26.39.0",
google_cloud_secret_manager : "com.google.cloud:google-cloud-secretmanager:$google_cloud_secret_manager_version",
google_cloud_spanner : "com.google.cloud:google-cloud-spanner", // google_cloud_platform_libraries_bom sets version
google_cloud_spanner_test : "com.google.cloud:google-cloud-spanner:$google_cloud_spanner_version:tests",
google_cloud_vertexai : "com.google.cloud:google-cloud-vertexai", // google_cloud_platform_libraries_bom sets version
Expand Down
1 change: 1 addition & 0 deletions sdks/java/io/solace/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ dependencies {
implementation library.java.joda_time
implementation library.java.solace
implementation library.java.google_cloud_core
implementation library.java.google_cloud_secret_manager
implementation library.java.vendored_guava_32_1_2_jre
implementation project(":sdks:java:extensions:avro")
implementation library.java.avro
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,10 @@
*
* }</pre>
*
* <h2>Writing</h2>
*
* TBD
*
* <h3>Authentication</h3>
*
* <p>When reading from Solace, the user must use {@link
Expand Down Expand Up @@ -209,6 +213,12 @@ public class SolaceIO {
};
private static final boolean DEFAULT_DEDUPLICATE_RECORDS = false;

// Part of the new write connector, documentation to be updated in upcoming pull requests
public enum SubmissionMode {
HIGHER_THROUGHPUT,
LOWER_LATENCY
}

/** Get a {@link Topic} object from the topic name. */
static Topic topicFromName(String topicName) {
return JCSMPFactory.onlyInstance().createTopic(topicName);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.solace.write.properties;

import com.google.auto.value.AutoValue;
import com.solacesystems.jcsmp.JCSMPProperties;

/**
* This is a convenience class to use with basic authentication for the {@link
* org.apache.beam.sdk.io.solace.SolaceIO.Write} connector.
*
* <p>This class is used by the connector to initialize the client session. The connector calls the
* {@link #initializeSessionProperties(JCSMPProperties)} and pass that object (with some additions)
* to the broker to create a session.
*
* <p>In this case, the properties that are set are those related to the authentication and the host
* details where the broker is located. The host should include the full host and port details, if
* the port used is not the default (55555).
*
* <p>Example of how to create the provider object:
*
* <pre>{@code
* BasicAuthenticationProvider propsFactory =
* BasicAuthenticationProvider.builder()
* .username("username")
* .password("password")
* .host("host:port")
* .build();
* }</pre>
*/
@AutoValue
public abstract class BasicAuthenticationProvider extends SessionPropertiesProvider {

public abstract String username();

public abstract String password();

public abstract String host();

public abstract String vpnName();

public static Builder builder() {
return new AutoValue_BasicAuthenticationProvider.Builder().vpnName(DEFAULT_VPN_NAME);
}

@AutoValue.Builder
public abstract static class Builder {
/** Username to be used to authenticate with the broker */
public abstract Builder username(String username);

/** Password to be used to authenticate with the broker */
public abstract Builder password(String password);

/**
* The location of the broker, including port details if it is not listening in the default port
*/
public abstract Builder host(String host);

/** Optional. Solace broker VPN name. If not set, "default" is used. */
public abstract Builder vpnName(String vpnName);

public abstract BasicAuthenticationProvider build();
}

@Override
public JCSMPProperties initializeSessionProperties(JCSMPProperties baseProps) {
baseProps.setProperty(JCSMPProperties.VPN_NAME, vpnName());

baseProps.setProperty(
JCSMPProperties.AUTHENTICATION_SCHEME, JCSMPProperties.AUTHENTICATION_SCHEME_BASIC);
baseProps.setProperty(JCSMPProperties.USERNAME, username());
baseProps.setProperty(JCSMPProperties.PASSWORD, password());
baseProps.setProperty(JCSMPProperties.HOST, host());
return baseProps;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.solace.write.properties;

import com.google.auto.value.AutoValue;
import com.google.cloud.secretmanager.v1.SecretManagerServiceClient;
import com.google.cloud.secretmanager.v1.SecretVersionName;
import com.solacesystems.jcsmp.JCSMPProperties;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This class implements a {@link SessionPropertiesProvider} that retrieve the basic authentication
* credentials from a Google Cloud Secret Manager secret.
*
* <p>It can be used to avoid having to pass the password as an option of your pipeline. For this
* provider to work, the worker where the job runs needs to have the necessary credentials to access
* the secret. In Dataflow, this implies adding the necessary permissions to the worker service
* account. For other runners, set the credentials in the pipeline options using {@link
* org.apache.beam.sdk.extensions.gcp.options.GcpOptions}.
*
* <p>It also shows how to implement a {@link SessionPropertiesProvider} that depends on using
* external resources to retrieve the Solace session properties. In this case, using the Google
* Cloud Secrete Manager client.
*
* <p>Example of how to create the provider object:
*
* <pre>{@code
* GoogleCloudSecretProvider provider =
* GoogleCloudSecretProvider.builder()
* .username("user")
* .host("host:port")
* .passwordSecretName("secret-name")
* .build();
* }</pre>
*/
@AutoValue
public abstract class GoogleCloudSecretProvider extends SessionPropertiesProvider {
private static final Logger LOG = LoggerFactory.getLogger(GoogleCloudSecretProvider.class);

private static final String PROJECT_NOT_FOUND = "PROJECT-NOT-FOUND";

public abstract String username();

public abstract String host();

public abstract String passwordSecretName();

public abstract String vpnName();

public abstract @Nullable String secretManagerProjectId();

public abstract String passwordSecretVersion();

public static Builder builder() {
return new AutoValue_GoogleCloudSecretProvider.Builder()
.passwordSecretVersion("latest")
.vpnName(DEFAULT_VPN_NAME);
}

@AutoValue.Builder
public abstract static class Builder {

/** Username to be used to authenticate with the broker */
public abstract Builder username(String username);

/**
* The location of the broker, including port details if it is not listening in the default port
*/
public abstract Builder host(String host);

/** The Secret Manager secret name where the password is stored */
public abstract Builder passwordSecretName(String name);

/** Optional. Solace broker VPN name. If not set, "default" is used. */
public abstract Builder vpnName(String name);

/**
* Optional for Dataflow or VMs running on Google Cloud. The project id of the project where the
* secret is stored. If not set, the project id where the job is running is used.
*/
public abstract Builder secretManagerProjectId(String id);

/** Optional. Solace broker password secret version. If not set, "latest" is used. */
public abstract Builder passwordSecretVersion(String version);

// Validate and set project name only if it is not passed by the user
public abstract GoogleCloudSecretProvider build();
}

@Override
public JCSMPProperties initializeSessionProperties(JCSMPProperties baseProperties) {
String password = null;
try {
password = retrieveSecret();
} catch (IOException e) {
throw new RuntimeException(e);
}
return BasicAuthenticationProvider.builder()
.username(username())
.host(host())
.password(password)
.vpnName(vpnName())
.build()
.initializeSessionProperties(baseProperties);
}

private String retrieveSecret() throws IOException {
try (SecretManagerServiceClient client = SecretManagerServiceClient.create()) {
String projectId =
Optional.ofNullable(secretManagerProjectId()).orElse(getProjectIdFromVmMetadata());
SecretVersionName secretVersionName =
SecretVersionName.of(projectId, passwordSecretName(), passwordSecretVersion());
return client.accessSecretVersion(secretVersionName).getPayload().getData().toStringUtf8();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

private String getProjectIdFromVmMetadata() throws IOException {
URL metadataUrl =
new URL("http://metadata.google.internal/computeMetadata/v1/project/project-id");
HttpURLConnection connection = (HttpURLConnection) metadataUrl.openConnection();
connection.setRequestProperty("Metadata-Flavor", "Google");

BufferedReader reader =
new BufferedReader(
new InputStreamReader(connection.getInputStream(), StandardCharsets.UTF_8));
String output = reader.readLine();
if (output == null || output.isEmpty()) {
LOG.error(
"Cannot retrieve project id from VM metadata, please set a project id in your GoogleCloudSecretProvider.");
}
return output != null ? output : PROJECT_NOT_FOUND;
}
}
Loading

0 comments on commit 9929c8d

Please sign in to comment.