Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add withDefaultHeaders to connection configuration for ElasticsearchIO #25024

Merged
merged 3 commits into from
Jan 18, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@
* }</pre>
*
* <p>The connection configuration also accepts optional configuration: {@code withUsername()},
* {@code withPassword()}, {@code withApiKey()} and {@code withBearerToken()}.
* {@code withPassword()}, {@code withApiKey()}, {@code withBearerToken()} and {@code
* withDefaultHeaders()}.
*
* <p>You can also specify a query on the {@code read()} using {@code withQuery()}.
*
Expand Down Expand Up @@ -326,6 +327,8 @@ public abstract static class ConnectionConfiguration implements Serializable {

public abstract @Nullable String getBearerToken();

public abstract @Nullable List<Header> getDefaultHeaders();

public abstract @Nullable String getKeystorePath();

public abstract @Nullable String getKeystorePassword();
Expand Down Expand Up @@ -354,6 +357,8 @@ abstract static class Builder {

abstract Builder setBearerToken(String bearerToken);

abstract Builder setDefaultHeaders(List<Header> defaultHeaders);

abstract Builder setKeystorePath(String keystorePath);

abstract Builder setKeystorePassword(String password);
Expand Down Expand Up @@ -501,29 +506,76 @@ public ConnectionConfiguration withPassword(String password) {
}

/**
* If Elasticsearch authentication is enabled, provide an API key.
* If Elasticsearch authentication is enabled, provide an API key. Be aware that you can only
* use one of {@Code withApiToken()}, {@code withBearerToken()} and {@code withDefaultHeaders}
* at the same time, as they (potentially) use the same header.
*
* @param apiKey the API key used to authenticate to Elasticsearch
* @return a {@link ConnectionConfiguration} describes a connection configuration to
* Elasticsearch.
*/
public ConnectionConfiguration withApiKey(String apiKey) {
checkArgument(!Strings.isNullOrEmpty(apiKey), "apiKey can not be null or empty");
checkArgument(getBearerToken() == null, "apiKey can not be combined with bearerToken");
checkArgument(getDefaultHeaders() == null, "apiKey can not be combined with defaultHeaders");
return builder().setApiKey(apiKey).build();
}

/**
* If Elasticsearch authentication is enabled, provide a bearer token.
* If Elasticsearch authentication is enabled, provide a bearer token. Be aware that you can
* only use one of {@Code withApiToken()}, {@code withBearerToken()} and {@code
* withDefaultHeaders} at the same time, as they (potentially) use the same header.
*
* @param bearerToken the bearer token used to authenticate to Elasticsearch
* @return a {@link ConnectionConfiguration} describes a connection configuration to
* Elasticsearch.
*/
public ConnectionConfiguration withBearerToken(String bearerToken) {
checkArgument(!Strings.isNullOrEmpty(bearerToken), "bearerToken can not be null or empty");
checkArgument(getApiKey() == null, "bearerToken can not be combined with apiKey");
checkArgument(
getDefaultHeaders() == null, "bearerToken can not be combined with defaultHeaders");
return builder().setBearerToken(bearerToken).build();
}

/**
* For authentication or custom requirements, provide a set if default headers for the client.
* Be aware that you can only use one of {@code withApiToken()}, {@code withBearerToken()} and
* {@code withDefaultHeaders} at the same time, as they (potentially) use the same header.
*
* <p>An example of where this could be useful is if the client needs to use short-lived
* credentials that need to be renewed on a certain interval. To implement that, a user could
* implement a custom header that tracks the renewal period, for example:
*
* <pre>
* {@code class OAuthTokenHeader extends BasicHeader {
* OAuthToken accessToken;
*
* ...
*
* @Override
* public String getValue() {
* if (accessToken.isExpired()) {
* accessToken.renew();
* }
* return String.format("Bearer %s", accessToken.getToken());
* }
* }}
* </pre>
*
* @param defaultHeaders the headers to add to outgoing requests
* @return a {@link ConnectionConfiguration} describes a connection configuration to
* Elasticsearch.
*/
public ConnectionConfiguration withDefaultHeaders(Header[] defaultHeaders) {
egalpin marked this conversation as resolved.
Show resolved Hide resolved
checkArgument(defaultHeaders != null, "defaultHeaders can not be null");
checkArgument(defaultHeaders.length > 0, "defaultHeaders can not be empty");
checkArgument(getApiKey() == null, "defaultHeaders can not be combined with apiKey");
checkArgument(
getBearerToken() == null, "defaultHeaders can not be combined with bearerToken");
return builder().setDefaultHeaders(Arrays.asList(defaultHeaders)).build();
}

/**
* If Elasticsearch uses SSL/TLS with mutual authentication (via shield), provide the keystore
* containing the client key.
Expand Down Expand Up @@ -640,6 +692,10 @@ RestClient createClient() throws IOException {
restClientBuilder.setDefaultHeaders(
new Header[] {new BasicHeader("Authorization", "Bearer " + getBearerToken())});
}
if (getDefaultHeaders() != null) {
Header[] headerList = new Header[getDefaultHeaders().size()];
restClientBuilder.setDefaultHeaders(getDefaultHeaders().toArray(headerList));
}

restClientBuilder.setHttpClientConfigCallback(
httpClientBuilder -> {
Expand Down