From 6f87445a88bc3b1a4ec25b66856a05cc11ada438 Mon Sep 17 00:00:00 2001 From: Egbert van der Wal Date: Mon, 16 Jan 2023 21:50:51 +0100 Subject: [PATCH 1/3] Add withDefaultHeaders to connection configuration for ElasticsearchIO --- .../sdk/io/elasticsearch/ElasticsearchIO.java | 29 ++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java index ddae2c224606..321d154c6dcc 100644 --- a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java +++ b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java @@ -137,7 +137,8 @@ * } * *

The connection configuration also accepts optional configuration: {@code withUsername()}, - * {@code withPassword()}, {@code withApiKey()} and {@code withBearerToken()}. + * {@code withPassword()}, {@code withApiKey()}, {@code withBearerToken()} and + * {@code withDefaultHeaders()}. * *

You can also specify a query on the {@code read()} using {@code withQuery()}. * @@ -326,6 +327,8 @@ public abstract static class ConnectionConfiguration implements Serializable { public abstract @Nullable String getBearerToken(); + public abstract @Nullable List

getDefaultHeaders(); + public abstract @Nullable String getKeystorePath(); public abstract @Nullable String getKeystorePassword(); @@ -354,6 +357,8 @@ abstract static class Builder { abstract Builder setBearerToken(String bearerToken); + abstract Builder setDefaultHeaders(List
defaultHeaders); + abstract Builder setKeystorePath(String keystorePath); abstract Builder setKeystorePassword(String password); @@ -502,6 +507,8 @@ public ConnectionConfiguration withPassword(String password) { /** * If Elasticsearch authentication is enabled, provide an API key. + * Be aware that you can only use one of {@Code withApiToken()}, {@code withBearerToken()} and + * {@code withDefaultHeaders} at the same time, as they will override eachother. * * @param apiKey the API key used to authenticate to Elasticsearch * @return a {@link ConnectionConfiguration} describes a connection configuration to @@ -514,6 +521,8 @@ public ConnectionConfiguration withApiKey(String apiKey) { /** * If Elasticsearch authentication is enabled, provide a bearer token. + * Be aware that you can only use one of {@Code withApiToken()}, {@code withBearerToken()} and + * {@code withDefaultHeaders} at the same time, as they will override eachother. * * @param bearerToken the bearer token used to authenticate to Elasticsearch * @return a {@link ConnectionConfiguration} describes a connection configuration to @@ -524,6 +533,21 @@ public ConnectionConfiguration withBearerToken(String bearerToken) { return builder().setBearerToken(bearerToken).build(); } + /** + * For authentication or custom requirements, provide a set if default headers for the client. + * Be aware that you can only use one of {@code withApiToken()}, {@code withBearerToken()} and + * {@code withDefaultHeaders} at the same time, as they will override eachother. + * + * @param defaultHeaders the headers to add to outgoing requests + * @return a {@link ConnectionConfiguration} describes a connection configuration to + * Elasticsearch. + */ + public ConnectionConfiguration withDefaultHeaders(Header [] defaultHeaders) { + checkArgument(defaultHeaders != null, "defaultHeaders can not be null"); + checkArgument(defaultHeaders.length > 0, "defaultHeaders can not be empty"); + return builder().setDefaultHeaders(Arrays.asList(defaultHeaders)).build(); + } + /** * If Elasticsearch uses SSL/TLS with mutual authentication (via shield), provide the keystore * containing the client key. @@ -640,6 +664,9 @@ RestClient createClient() throws IOException { restClientBuilder.setDefaultHeaders( new Header[] {new BasicHeader("Authorization", "Bearer " + getBearerToken())}); } + if (getDefaultHeaders() != null) { + restClientBuilder.setDefaultHeaders(getDefaultHeaders().toArray(Header[]::new)); + } restClientBuilder.setHttpClientConfigCallback( httpClientBuilder -> { From de21be6e1800289876c977678ebeffa8a2e355d3 Mon Sep 17 00:00:00 2001 From: Egbert van der Wal Date: Tue, 17 Jan 2023 09:20:49 +0100 Subject: [PATCH 2/3] Address PR comments, run spotless --- .../sdk/io/elasticsearch/ElasticsearchIO.java | 30 ++++++++++++------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java index 321d154c6dcc..680802c4428b 100644 --- a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java +++ b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java @@ -137,8 +137,8 @@ * } * *

The connection configuration also accepts optional configuration: {@code withUsername()}, - * {@code withPassword()}, {@code withApiKey()}, {@code withBearerToken()} and - * {@code withDefaultHeaders()}. + * {@code withPassword()}, {@code withApiKey()}, {@code withBearerToken()} and {@code + * withDefaultHeaders()}. * *

You can also specify a query on the {@code read()} using {@code withQuery()}. * @@ -506,9 +506,9 @@ public ConnectionConfiguration withPassword(String password) { } /** - * If Elasticsearch authentication is enabled, provide an API key. - * Be aware that you can only use one of {@Code withApiToken()}, {@code withBearerToken()} and - * {@code withDefaultHeaders} at the same time, as they will override eachother. + * If Elasticsearch authentication is enabled, provide an API key. Be aware that you can only + * use one of {@Code withApiToken()}, {@code withBearerToken()} and {@code withDefaultHeaders} + * at the same time, as they (potentially) use the same header. * * @param apiKey the API key used to authenticate to Elasticsearch * @return a {@link ConnectionConfiguration} describes a connection configuration to @@ -516,13 +516,15 @@ public ConnectionConfiguration withPassword(String password) { */ public ConnectionConfiguration withApiKey(String apiKey) { checkArgument(!Strings.isNullOrEmpty(apiKey), "apiKey can not be null or empty"); + checkArgument(getBearerToken() == null, "apiKey can not be combined with bearerToken"); + checkArgument(getDefaultHeaders() == null, "apiKey can not be combined with defaultHeaders"); return builder().setApiKey(apiKey).build(); } /** - * If Elasticsearch authentication is enabled, provide a bearer token. - * Be aware that you can only use one of {@Code withApiToken()}, {@code withBearerToken()} and - * {@code withDefaultHeaders} at the same time, as they will override eachother. + * If Elasticsearch authentication is enabled, provide a bearer token. Be aware that you can + * only use one of {@Code withApiToken()}, {@code withBearerToken()} and {@code + * withDefaultHeaders} at the same time, as they (potentially) use the same header. * * @param bearerToken the bearer token used to authenticate to Elasticsearch * @return a {@link ConnectionConfiguration} describes a connection configuration to @@ -530,21 +532,26 @@ public ConnectionConfiguration withApiKey(String apiKey) { */ public ConnectionConfiguration withBearerToken(String bearerToken) { checkArgument(!Strings.isNullOrEmpty(bearerToken), "bearerToken can not be null or empty"); + checkArgument(getApiKey() == null, "bearerToken can not be combined with apiKey"); + checkArgument(getDefaultHeaders() == null, "apiKey can not be combined with defaultHeaders"); return builder().setBearerToken(bearerToken).build(); } /** * For authentication or custom requirements, provide a set if default headers for the client. * Be aware that you can only use one of {@code withApiToken()}, {@code withBearerToken()} and - * {@code withDefaultHeaders} at the same time, as they will override eachother. + * {@code withDefaultHeaders} at the same time, as they (potentially) use the same header. * * @param defaultHeaders the headers to add to outgoing requests * @return a {@link ConnectionConfiguration} describes a connection configuration to * Elasticsearch. */ - public ConnectionConfiguration withDefaultHeaders(Header [] defaultHeaders) { + public ConnectionConfiguration withDefaultHeaders(Header[] defaultHeaders) { checkArgument(defaultHeaders != null, "defaultHeaders can not be null"); checkArgument(defaultHeaders.length > 0, "defaultHeaders can not be empty"); + checkArgument(getApiKey() == null, "defaultHeaders can not be combined with apiKey"); + checkArgument( + getBearerToken() == null, "defaultHeaders can not be combined with bearerToken"); return builder().setDefaultHeaders(Arrays.asList(defaultHeaders)).build(); } @@ -665,7 +672,8 @@ RestClient createClient() throws IOException { new Header[] {new BasicHeader("Authorization", "Bearer " + getBearerToken())}); } if (getDefaultHeaders() != null) { - restClientBuilder.setDefaultHeaders(getDefaultHeaders().toArray(Header[]::new)); + Header[] headerList = new Header[getDefaultHeaders().size()]; + restClientBuilder.setDefaultHeaders(getDefaultHeaders().toArray(headerList)); } restClientBuilder.setHttpClientConfigCallback( From a05a9c0d07af81995affff6b835c4c4a8dee29fa Mon Sep 17 00:00:00 2001 From: Egbert van der Wal Date: Wed, 18 Jan 2023 07:49:40 +0100 Subject: [PATCH 3/3] Fix copy/paste error in Javadoc, add example for usage of withDefaultHeaders --- .../sdk/io/elasticsearch/ElasticsearchIO.java | 23 ++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java index 680802c4428b..f4b3447936b4 100644 --- a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java +++ b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java @@ -533,7 +533,8 @@ public ConnectionConfiguration withApiKey(String apiKey) { public ConnectionConfiguration withBearerToken(String bearerToken) { checkArgument(!Strings.isNullOrEmpty(bearerToken), "bearerToken can not be null or empty"); checkArgument(getApiKey() == null, "bearerToken can not be combined with apiKey"); - checkArgument(getDefaultHeaders() == null, "apiKey can not be combined with defaultHeaders"); + checkArgument( + getDefaultHeaders() == null, "bearerToken can not be combined with defaultHeaders"); return builder().setBearerToken(bearerToken).build(); } @@ -542,6 +543,26 @@ public ConnectionConfiguration withBearerToken(String bearerToken) { * Be aware that you can only use one of {@code withApiToken()}, {@code withBearerToken()} and * {@code withDefaultHeaders} at the same time, as they (potentially) use the same header. * + *

An example of where this could be useful is if the client needs to use short-lived + * credentials that need to be renewed on a certain interval. To implement that, a user could + * implement a custom header that tracks the renewal period, for example: + * + *

+     * {@code class OAuthTokenHeader extends BasicHeader {
+     *     OAuthToken accessToken;
+     *
+     *     ...
+     *
+     *     @Override
+     *     public String getValue() {
+     *         if (accessToken.isExpired()) {
+     *             accessToken.renew();
+     *         }
+     *         return String.format("Bearer %s", accessToken.getToken());
+     *     }
+     * }}
+     * 
+ * * @param defaultHeaders the headers to add to outgoing requests * @return a {@link ConnectionConfiguration} describes a connection configuration to * Elasticsearch.