Skip to content

Commit

Permalink
Add support for HTTP transport in thrift metastore client
Browse files Browse the repository at this point in the history
  • Loading branch information
vihangk1 authored and ebyhr committed Feb 20, 2024
1 parent ade0930 commit 0da647c
Show file tree
Hide file tree
Showing 38 changed files with 1,424 additions and 79 deletions.
14 changes: 14 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -831,6 +831,7 @@ jobs:
- suite-delta-lake-databricks113
- suite-delta-lake-databricks122
- suite-delta-lake-databricks133
- suite-databricks-unity-http-hms
- suite-gcs
- suite-clients
- suite-functions
Expand Down Expand Up @@ -877,6 +878,11 @@ jobs:
- suite: suite-delta-lake-databricks133
ignore exclusion if: >-
${{ env.CI_SKIP_SECRETS_PRESENCE_CHECKS != '' || secrets.DATABRICKS_TOKEN != '' }}
- suite: suite-databricks-unity-http-hms
config: hdp3
- suite: suite-databricks-unity-http-hms
ignore exclusion if: >-
${{ env.CI_SKIP_SECRETS_PRESENCE_CHECKS != '' || secrets.DATABRICKS_TOKEN != '' }}
ignore exclusion if:
# Do not use this property outside of the matrix configuration.
Expand Down Expand Up @@ -923,6 +929,10 @@ jobs:
DATABRICKS_113_JDBC_URL:
DATABRICKS_122_JDBC_URL:
DATABRICKS_133_JDBC_URL:
DATABRICKS_UNITY_JDBC_URL:
DATABRICKS_UNITY_CATALOG_NAME:
DATABRICKS_UNITY_EXTERNAL_LOCATION:
DATABRICKS_HOST:
DATABRICKS_LOGIN:
DATABRICKS_TOKEN:
GCP_CREDENTIALS_KEY:
Expand Down Expand Up @@ -986,6 +996,10 @@ jobs:
DATABRICKS_113_JDBC_URL: ${{ secrets.DATABRICKS_113_JDBC_URL }}
DATABRICKS_122_JDBC_URL: ${{ secrets.DATABRICKS_122_JDBC_URL }}
DATABRICKS_133_JDBC_URL: ${{ secrets.DATABRICKS_133_JDBC_URL }}
DATABRICKS_UNITY_JDBC_URL: ${{ secrets.DATABRICKS_UNITY_JDBC_URL }}
DATABRICKS_UNITY_CATALOG_NAME: ${{ vars.DATABRICKS_UNITY_CATALOG_NAME }}
DATABRICKS_UNITY_EXTERNAL_LOCATION: ${{ vars.DATABRICKS_UNITY_EXTERNAL_LOCATION }}
DATABRICKS_HOST: ${{ vars.DATABRICKS_HOST }}
DATABRICKS_LOGIN: token
DATABRICKS_TOKEN: ${{ secrets.DATABRICKS_TOKEN }}
GCP_CREDENTIALS_KEY: ${{ secrets.GCP_CREDENTIALS_KEY }}
Expand Down
20 changes: 20 additions & 0 deletions docs/src/main/sphinx/connector/metastores.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,26 @@ properties:
* - `hive.metastore.thrift.txn-lock-max-wait`
- Maximum time to wait to acquire hive transaction lock.
- `10m`
* - ``hive.metastore.http.client.bearer-token``
- Bearer token used to authenticate with the metastore service
when https transport mode is used. This must not be set when
using http url.
-
* - ``hive.metastore.http.client.additional-headers``
- Additional headers which can be sent to the metastore service
http thrift requests when using the http transport mode. These
headers must be comma-separated and delimited using ``:``. E.g
header1:value1,header2:value2 sends two headers header1 and
header2 with their values as value1 and value2 respectively.
If you need to use a comma(``,``) or colon(``:``) in a header name
or value, escape it using a backslash (``\``).
-
* - ``hive.metastore.http.client.authentication.type``
- The authentication type to be used for the http metastore client.
Currently, the only supported type is ``BEARER``. When set to ``BEARER``
the token configured in ``hive.metastore.http.client.bearer-token``
is used to authenticate the client to the http metastore service.
-
:::

(hive-glue-metastore)=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ protected QueryRunner createQueryRunner()
"hive",
ImmutableMap.<String, String>builder()
.put("hive.metastore", "thrift")
.put("hive.metastore.uri", "thrift://" + hiveHadoop.getHiveMetastoreEndpoint())
.put("hive.metastore.uri", hiveHadoop.getHiveMetastoreEndpoint().toString())
.put("hive.allow-drop-table", "true")
.putAll(hiveStorageConfiguration())
.buildOrThrow());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ public static QueryRunner createDockerizedDeltaLakeQueryRunner(
.setCoordinatorProperties(coordinatorProperties)
.addExtraProperties(extraProperties)
.setDeltaProperties(ImmutableMap.<String, String>builder()
.put("hive.metastore.uri", "thrift://" + hiveHadoop.getHiveMetastoreEndpoint())
.put("hive.metastore.uri", hiveHadoop.getHiveMetastoreEndpoint().toString())
.putAll(connectorProperties)
.buildOrThrow())
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ protected QueryRunner createQueryRunner()
"hive",
ImmutableMap.<String, String>builder()
.put("hive.metastore", "thrift")
.put("hive.metastore.uri", "thrift://" + hiveMinioDataLake.getHiveHadoop().getHiveMetastoreEndpoint())
.put("hive.metastore.uri", hiveMinioDataLake.getHiveHadoop().getHiveMetastoreEndpoint().toString())
.put("hive.allow-drop-table", "true")
.putAll(s3Properties)
.buildOrThrow());
Expand Down
38 changes: 38 additions & 0 deletions plugin/trino-hive/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
TODO (https://github.com/trinodb/trino/issues/11294) remove when we upgrade to surefire with https://issues.apache.org/jira/browse/SUREFIRE-1967
-->
<air.test.parallel>instances</air.test.parallel>
<dep.http.version>5.2.1</dep.http.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -207,6 +208,18 @@
<artifactId>avro</artifactId>
</dependency>

<dependency>
<groupId>org.apache.httpcomponents.client5</groupId>
<artifactId>httpclient5</artifactId>
<version>${dep.http.version}</version>
</dependency>

<dependency>
<groupId>org.apache.httpcomponents.core5</groupId>
<artifactId>httpcore5</artifactId>
<version>${dep.http.version}</version>
</dependency>

<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-column</artifactId>
Expand Down Expand Up @@ -280,6 +293,12 @@
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>node</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk</artifactId>
Expand Down Expand Up @@ -310,18 +329,37 @@
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>4.4.13</version>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-common</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>org.eclipse.jetty.toolchain</groupId>
<artifactId>jetty-jakarta-servlet-api</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>org.jetbrains</groupId>
<artifactId>annotations</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>http-server</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>junit-extensions</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.security.GeneralSecurityException;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;

import static com.google.common.base.Preconditions.checkArgument;
import static io.trino.plugin.base.ssl.SslUtils.createSSLContext;
import static java.lang.Math.toIntExact;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;

public class DefaultThriftMetastoreClientFactory
Expand Down Expand Up @@ -89,10 +92,17 @@ public DefaultThriftMetastoreClientFactory(
}

@Override
public ThriftMetastoreClient create(HostAndPort address, Optional<String> delegationToken)
public ThriftMetastoreClient create(URI uri, Optional<String> delegationToken)
throws TTransportException
{
return create(() -> createTransport(address, delegationToken), hostname);
return create(() -> getTransportSupplier(uri, delegationToken), hostname);
}

private TTransport getTransportSupplier(URI uri, Optional<String> delegationToken)
throws TTransportException
{
checkArgument(uri.getScheme().toLowerCase(ENGLISH).equals("thrift"), "Invalid metastore uri scheme %s", uri.getScheme());
return createTransport(HostAndPort.fromParts(uri.getHost(), uri.getPort()), delegationToken);
}

protected ThriftMetastoreClient create(TransportSupplier transportSupplier, String hostname)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.trino.plugin.hive.metastore.thrift;

import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import io.trino.spi.NodeManager;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.impl.classic.HttpClientBuilder;
import org.apache.hc.client5.http.impl.io.BasicHttpClientConnectionManager;
import org.apache.hc.client5.http.socket.ConnectionSocketFactory;
import org.apache.hc.client5.http.ssl.DefaultHostnameVerifier;
import org.apache.hc.client5.http.ssl.SSLConnectionSocketFactory;
import org.apache.hc.core5.http.HttpHeaders;
import org.apache.hc.core5.http.config.Registry;
import org.apache.hc.core5.http.config.RegistryBuilder;
import org.apache.thrift.transport.THttpClient;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;

import javax.net.ssl.SSLContext;

import java.net.URI;
import java.security.NoSuchAlgorithmException;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static com.google.common.base.Preconditions.checkArgument;
import static java.lang.Math.toIntExact;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;

public class HttpThriftMetastoreClientFactory
implements ThriftMetastoreClientFactory
{
private final int readTimeoutMillis;
private final String hostname;
private final Optional<ThriftHttpMetastoreConfig.AuthenticationMode> authenticationMode;
private final Optional<String> token;
private final Map<String, String> additionalHeaders;

@Inject
public HttpThriftMetastoreClientFactory(
ThriftHttpMetastoreConfig httpMetastoreConfig,
NodeManager nodeManager)
{
this.readTimeoutMillis = toIntExact(httpMetastoreConfig.getReadTimeout().toMillis());
this.hostname = requireNonNull(nodeManager.getCurrentNode().getHost(), "hostname is null");
this.authenticationMode = httpMetastoreConfig.getAuthenticationMode();
this.token = httpMetastoreConfig.getHttpBearerToken();
this.additionalHeaders = ImmutableMap.copyOf(httpMetastoreConfig.getAdditionalHeaders());
}

@Override
public ThriftMetastoreClient create(URI uri, Optional<String> delegationToken)
throws TTransportException
{
return new ThriftHiveMetastoreClient(
() -> createHttpTransport(uri),
hostname,
new MetastoreSupportsDateStatistics(),
new AtomicInteger(Integer.MAX_VALUE),
new AtomicInteger(Integer.MAX_VALUE),
new AtomicInteger(Integer.MAX_VALUE),
new AtomicInteger(Integer.MAX_VALUE),
new AtomicInteger(Integer.MAX_VALUE),
new AtomicInteger(Integer.MAX_VALUE),
new AtomicInteger(Integer.MAX_VALUE));
}

private TTransport createHttpTransport(URI uri)
throws TTransportException
{
HttpClientBuilder httpClientBuilder = HttpClientBuilder.create();
if ("https".equals(uri.getScheme().toLowerCase(ENGLISH))) {
checkArgument(token.isPresent(), "'hive.metastore.http.client.bearer-token' must be set while using https metastore URIs in 'hive.metastore.uri'");
checkArgument(authenticationMode.isPresent(), "'hive.metastore.http.client.authentication.type' must be set while using http/https metastore URIs in 'hive.metastore.uri'");
SSLConnectionSocketFactory socketFactory;
try {
socketFactory = new SSLConnectionSocketFactory(SSLContext.getDefault(), new DefaultHostnameVerifier());
}
catch (NoSuchAlgorithmException e) {
throw new TTransportException(e);
}
Registry<ConnectionSocketFactory> registry = RegistryBuilder.<ConnectionSocketFactory>create()
.register("https", socketFactory)
.build();
httpClientBuilder.setConnectionManager(new BasicHttpClientConnectionManager(registry));
httpClientBuilder.addRequestInterceptorFirst((httpRequest, entityDetails, httpContext) -> httpRequest.addHeader(HttpHeaders.AUTHORIZATION, "Bearer " + token.get()));
}
else {
checkArgument(token.isEmpty(), "'hive.metastore.http.client.bearer-token' must not be set while using http metastore URIs in 'hive.metastore.uri'");
}
httpClientBuilder.addRequestInterceptorFirst((httpRequest, entityDetails, httpContext) -> additionalHeaders.forEach(httpRequest::addHeader));
httpClientBuilder.setDefaultRequestConfig(RequestConfig.custom().setResponseTimeout(readTimeoutMillis, TimeUnit.MILLISECONDS).build());
return new THttpClient(uri.toString(), httpClientBuilder.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.google.common.base.Splitter;
import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;
import jakarta.validation.constraints.AssertFalse;
import jakarta.validation.constraints.NotNull;

import java.net.URI;
Expand Down Expand Up @@ -67,4 +68,23 @@ public StaticMetastoreConfig setMetastoreUsername(String metastoreUsername)
this.metastoreUsername = metastoreUsername;
return this;
}

@AssertFalse(message = "'hive.metastore.uri' cannot contain both http and https URI schemes")
public boolean isMetastoreHttpUrisValid()
{
boolean hasHttpMetastore = metastoreUris.stream().anyMatch(uri -> "http".equalsIgnoreCase(uri.getScheme()));
boolean hasHttpsMetastore = metastoreUris.stream().anyMatch(uri -> "https".equalsIgnoreCase(uri.getScheme()));
if (hasHttpsMetastore || hasHttpMetastore) {
return hasHttpMetastore && hasHttpsMetastore;
}
return false;
}

@AssertFalse(message = "'hive.metastore.uri' cannot contain both http(s) and thrift URI schemes")
public boolean isEitherThriftOrHttpMetastore()
{
boolean hasHttpMetastore = metastoreUris.stream().anyMatch(uri -> "http".equalsIgnoreCase(uri.getScheme()) || "https".equalsIgnoreCase(uri.getScheme()));
boolean hasThriftMetastore = metastoreUris.stream().anyMatch(uri -> "thrift".equalsIgnoreCase(uri.getScheme()));
return hasHttpMetastore && hasThriftMetastore;
}
}
Loading

0 comments on commit 0da647c

Please sign in to comment.