Skip to content

Commit

Permalink
Add support for pinot proxy
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangfu0 authored and martint committed Oct 9, 2022
1 parent c9deaf6 commit 4ae6fcd
Show file tree
Hide file tree
Showing 8 changed files with 194 additions and 20 deletions.
2 changes: 2 additions & 0 deletions docs/src/main/sphinx/connector/pinot.rst
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ Property name Required Description
``pinot.aggregation-pushdown.enabled`` No Push down aggregation queries, default is ``true``.
``pinot.count-distinct-pushdown.enabled`` No Push down count distinct queries to Pinot, default is ``true``.
``pinot.target-segment-page-size`` No Max allowed page size for segment query, default is ``1MB``.
``pinot.proxy.enabled`` No Use Pinot Proxy for controller and broker requests, default is ``false``.
========================================================= ========== ==============================================================================

If ``pinot.controller.authentication.type`` is set to ``PASSWORD`` then both ``pinot.controller.authentication.user`` and
Expand Down Expand Up @@ -101,6 +102,7 @@ Property name Required Description
``pinot.grpc.tls.truststore-path`` No TLS truststore file location for gRPC connection, default is empty.
``pinot.grpc.tls.truststore-password`` No TLS truststore password, default is empty.
``pinot.grpc.tls.ssl-provider`` No SSL provider, default is ``JDK``.
``pinot.grpc.proxy-uri`` No Pinot Rest Proxy gRPC endpoint URI, default is null.
========================================================= ========== ==============================================================================

For more Apache Pinot TLS configurations, please also refer to `Configuring TLS/SSL <https://docs.pinot.apache.org/operators/tutorials/configuring-tls-ssl>`_.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public class PinotConfig
private boolean aggregationPushdownEnabled = true;
private boolean countDistinctPushdownEnabled = true;
private boolean grpcEnabled = true;
private boolean proxyEnabled;
private DataSize targetSegmentPageSize = DataSize.of(1, MEGABYTE);

@NotEmpty(message = "pinot.controller-urls cannot be empty")
Expand Down Expand Up @@ -245,6 +246,18 @@ public boolean isTlsEnabled()
return "https".equalsIgnoreCase(getControllerUrls().get(0).getScheme());
}

public boolean getProxyEnabled()
{
return proxyEnabled;
}

@Config("pinot.proxy.enabled")
public PinotConfig setProxyEnabled(boolean proxyEnabled)
{
this.proxyEnabled = proxyEnabled;
return this;
}

public DataSize getTargetSegmentPageSize()
{
return this.targetSegmentPageSize;
Expand Down Expand Up @@ -273,4 +286,13 @@ public boolean allUrlSchemesEqual()
.distinct()
.count() == 1;
}

@AssertTrue(message = "Using the rest proxy requires GRPC to be enabled by setting pinot.grpc.enabled=true")
public boolean proxyRestAndGrpcAreRequired()
{
if (proxyEnabled) {
return grpcEnabled;
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ public class PinotClient
private final HttpClient httpClient;
private final PinotHostMapper pinotHostMapper;
private final String scheme;
private final boolean proxyEnabled;

private final NonEvictableLoadingCache<String, List<String>> brokersForTableCache;
private final NonEvictableLoadingCache<Object, Multimap<String, String>> allTablesCache;
Expand Down Expand Up @@ -154,7 +155,9 @@ public PinotClient(
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)).jsonCodec(Schema.class);
this.brokerResponseCodec = requireNonNull(brokerResponseCodec, "brokerResponseCodec is null");
this.pinotHostMapper = requireNonNull(pinotHostMapper, "pinotHostMapper is null");
requireNonNull(config, "config is null");
this.scheme = config.isTlsEnabled() ? "https" : "http";
this.proxyEnabled = config.getProxyEnabled();

this.controllerUrls = config.getControllerUrls();
this.httpClient = requireNonNull(httpClient, "httpClient is null");
Expand Down Expand Up @@ -228,18 +231,22 @@ private <T> T sendHttpGetToBrokerJson(String table, String path, JsonCodec<T> co
{
ImmutableMultimap.Builder<String, String> additionalHeadersBuilder = ImmutableMultimap.builder();
brokerAuthenticationProvider.getAuthenticationToken().ifPresent(token -> additionalHeadersBuilder.put(AUTHORIZATION, token));
URI brokerPathUri = HttpUriBuilder.uriBuilder()
.hostAndPort(HostAndPort.fromString(getBrokerHost(table)))
.scheme(scheme)
.appendPath(path)
.build();
HttpUriBuilder httpUriBuilder = getBrokerHttpUriBuilder(getBrokerHost(table));
URI brokerPathUri = httpUriBuilder.scheme(scheme).appendPath(path).build();
return doHttpActionWithHeadersJson(
Request.Builder.prepareGet().setUri(brokerPathUri),
Optional.empty(),
codec,
additionalHeadersBuilder.build());
}

private HttpUriBuilder getBrokerHttpUriBuilder(String hostAndPort)
{
return proxyEnabled ?
HttpUriBuilder.uriBuilderFrom(getControllerUrl()) :
HttpUriBuilder.uriBuilder().hostAndPort(HostAndPort.fromString(hostAndPort));
}

private URI getControllerUrl()
{
return controllerUrls.get(ThreadLocalRandom.current().nextInt(controllerUrls.size()));
Expand Down Expand Up @@ -531,8 +538,8 @@ private BrokerResponseNative submitBrokerQueryJson(ConnectorSession session, Pin
{
String queryRequest = QUERY_REQUEST_JSON_CODEC.toJson(new QueryRequest(query.getQuery()));
return doWithRetries(PinotSessionProperties.getPinotRetryCount(session), retryNumber -> {
URI queryPathUri = HttpUriBuilder.uriBuilder()
.hostAndPort(HostAndPort.fromString(getBrokerHost(query.getTable())))
HttpUriBuilder httpUriBuilder = getBrokerHttpUriBuilder(getBrokerHost(query.getTable()));
URI queryPathUri = httpUriBuilder
.scheme(scheme)
.appendPath(QUERY_URL_PATH)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,14 @@
import io.trino.plugin.pinot.PinotErrorCode;
import io.trino.plugin.pinot.PinotException;
import io.trino.plugin.pinot.PinotSplit;
import io.trino.plugin.pinot.query.PinotProxyGrpcRequestBuilder;
import io.trino.spi.connector.ConnectorSession;
import org.apache.pinot.common.config.GrpcConfig;
import org.apache.pinot.common.proto.Server;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.utils.grpc.GrpcQueryClient;
import org.apache.pinot.common.utils.grpc.GrpcRequestBuilder;
import org.apache.pinot.core.common.datatable.DataTableFactory;
import org.apache.pinot.spi.utils.CommonConstants.Query.Response.MetadataKeys;
import org.apache.pinot.spi.utils.CommonConstants.Query.Response.ResponseType;
import org.apache.pinot.sql.parsers.CalciteSqlCompiler;

import javax.annotation.PreDestroy;
import javax.inject.Inject;
Expand All @@ -43,6 +41,7 @@
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;

import static java.lang.Boolean.FALSE;
import static java.util.Objects.requireNonNull;
import static org.apache.pinot.common.config.GrpcConfig.CONFIG_MAX_INBOUND_MESSAGE_BYTES_SIZE;
import static org.apache.pinot.common.config.GrpcConfig.CONFIG_USE_PLAIN_TEXT;
Expand Down Expand Up @@ -189,9 +188,14 @@ public static class TlsGrpcQueryClientFactory
private final GrpcConfig config;

@Inject
public TlsGrpcQueryClientFactory(PinotGrpcServerQueryClientTlsConfig tlsConfig)
public TlsGrpcQueryClientFactory(PinotGrpcServerQueryClientConfig grpcClientConfig, PinotGrpcServerQueryClientTlsConfig tlsConfig)
{
ImmutableMap.Builder<String, Object> tlsConfigBuilder = ImmutableMap.builder();
requireNonNull(grpcClientConfig, "grpcClientConfig is null");
requireNonNull(tlsConfig, "tlsConfig is null");
ImmutableMap.Builder<String, Object> tlsConfigBuilder = ImmutableMap.<String, Object>builder()
.put(CONFIG_MAX_INBOUND_MESSAGE_BYTES_SIZE, String.valueOf(grpcClientConfig.getMaxInboundMessageSize().toBytes()))
.put(CONFIG_USE_PLAIN_TEXT, FALSE.toString());

if (tlsConfig.getKeystorePath().isPresent()) {
tlsConfigBuilder.put(KEYSTORE_TYPE, tlsConfig.getKeystoreType());
tlsConfigBuilder.put(KEYSTORE_PATH, tlsConfig.getKeystorePath().get());
Expand Down Expand Up @@ -220,6 +224,7 @@ public static class PinotGrpcServerQueryClient
private final Map<HostAndPort, GrpcQueryClient> clientCache = new ConcurrentHashMap<>();
private final int grpcPort;
private final GrpcQueryClientFactory grpcQueryClientFactory;
private final Optional<String> proxyUri;
private final Closer closer;

private PinotGrpcServerQueryClient(PinotHostMapper pinotHostMapper, PinotGrpcServerQueryClientConfig pinotGrpcServerQueryClientConfig, GrpcQueryClientFactory grpcQueryClientFactory, Closer closer)
Expand All @@ -229,24 +234,28 @@ private PinotGrpcServerQueryClient(PinotHostMapper pinotHostMapper, PinotGrpcSer
this.grpcPort = pinotGrpcServerQueryClientConfig.getGrpcPort();
this.grpcQueryClientFactory = requireNonNull(grpcQueryClientFactory, "grpcQueryClientFactory is null");
this.closer = requireNonNull(closer, "closer is null");
this.proxyUri = pinotGrpcServerQueryClientConfig.getProxyUri();
}

public Iterator<PinotDataTableWithSize> queryPinot(ConnectorSession session, String query, String serverHost, List<String> segments)
{
HostAndPort mappedHostAndPort = pinotHostMapper.getServerGrpcHostAndPort(serverHost, grpcPort);
// GrpcQueryClient does not implement Closeable. The idle timeout is 30 minutes (grpc default).
GrpcQueryClient client = clientCache.computeIfAbsent(mappedHostAndPort, hostAndPort -> {
GrpcQueryClient queryClient = grpcQueryClientFactory.create(hostAndPort);
GrpcQueryClient queryClient = proxyUri.isPresent() ? grpcQueryClientFactory.create(HostAndPort.fromString(proxyUri.get())) : grpcQueryClientFactory.create(hostAndPort);
closer.register(queryClient::close);
return queryClient;
});
BrokerRequest brokerRequest = CalciteSqlCompiler.compileToBrokerRequest(query);
GrpcRequestBuilder requestBuilder = new GrpcRequestBuilder()
PinotProxyGrpcRequestBuilder grpcRequestBuilder = new PinotProxyGrpcRequestBuilder()
.setSql(query)
.setSegments(segments)
.setEnableStreaming(true)
.setBrokerRequest(brokerRequest);
return new ResponseIterator(client.submit(requestBuilder.build()));
.setEnableStreaming(true);

if (proxyUri.isPresent()) {
grpcRequestBuilder.setHostName(mappedHostAndPort.getHost()).setPort(grpcPort);
}
Server.ServerRequest serverRequest = grpcRequestBuilder.build();
return new ResponseIterator(client.submit(serverRequest));
}

public static class ResponseIterator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import io.airlift.configuration.Config;
import io.airlift.units.DataSize;

import java.util.Optional;

import static org.apache.pinot.common.config.GrpcConfig.DEFAULT_MAX_INBOUND_MESSAGE_BYTES_SIZE;

public class PinotGrpcServerQueryClientConfig
Expand All @@ -24,6 +26,7 @@ public class PinotGrpcServerQueryClientConfig
private int grpcPort = 8090;
private DataSize maxInboundMessageSize = DataSize.ofBytes(DEFAULT_MAX_INBOUND_MESSAGE_BYTES_SIZE);
private boolean usePlainText = true;
private Optional<String> proxyUri = Optional.empty();

public int getMaxRowsPerSplitForSegmentQueries()
{
Expand Down Expand Up @@ -72,4 +75,18 @@ public PinotGrpcServerQueryClientConfig setUsePlainText(boolean usePlainText)
this.usePlainText = usePlainText;
return this;
}

public Optional<String> getProxyUri()
{
return proxyUri;
}

@Config("pinot.grpc.proxy-uri")
public PinotGrpcServerQueryClientConfig setProxyUri(String proxyUri)
{
if (proxyUri != null && !proxyUri.isEmpty()) {
this.proxyUri = Optional.of(proxyUri);
}
return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.pinot.query;

import com.google.common.collect.ImmutableList;
import org.apache.pinot.common.proto.Server;
import org.apache.pinot.spi.utils.CommonConstants;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class PinotProxyGrpcRequestBuilder
{
private static final String KEY_OF_PROXY_GRPC_FORWARD_HOST = "FORWARD_HOST";
private static final String KEY_OF_PROXY_GRPC_FORWARD_PORT = "FORWARD_PORT";

private String hostName;
private int port = -1;
private int requestId;
private String brokerId = "unknown";
private boolean enableTrace;
private boolean enableStreaming;
private String payloadType;
private String sql;
private List<String> segments;

public PinotProxyGrpcRequestBuilder setHostName(String hostName)
{
this.hostName = hostName;
return this;
}

public PinotProxyGrpcRequestBuilder setPort(int port)
{
this.port = port;
return this;
}

public PinotProxyGrpcRequestBuilder setRequestId(int requestId)
{
this.requestId = requestId;
return this;
}

public PinotProxyGrpcRequestBuilder setBrokerId(String brokerId)
{
this.brokerId = brokerId;
return this;
}

public PinotProxyGrpcRequestBuilder setEnableTrace(boolean enableTrace)
{
this.enableTrace = enableTrace;
return this;
}

public PinotProxyGrpcRequestBuilder setEnableStreaming(boolean enableStreaming)
{
this.enableStreaming = enableStreaming;
return this;
}

public PinotProxyGrpcRequestBuilder setSql(String sql)
{
payloadType = CommonConstants.Query.Request.PayloadType.SQL;
this.sql = sql;
return this;
}

public PinotProxyGrpcRequestBuilder setSegments(List<String> segments)
{
this.segments = ImmutableList.copyOf(segments);
return this;
}

public Server.ServerRequest build()
{
if (!payloadType.equals(CommonConstants.Query.Request.PayloadType.SQL)) {
throw new RuntimeException("Only [SQL] Payload type is allowed: " + payloadType);
}
Map<String, String> metadata = new HashMap<>();
metadata.put(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID, Integer.toString(requestId));
metadata.put(CommonConstants.Query.Request.MetadataKeys.BROKER_ID, brokerId);
metadata.put(CommonConstants.Query.Request.MetadataKeys.ENABLE_TRACE, Boolean.toString(enableTrace));
metadata.put(CommonConstants.Query.Request.MetadataKeys.ENABLE_STREAMING, Boolean.toString(enableStreaming));
metadata.put(CommonConstants.Query.Request.MetadataKeys.PAYLOAD_TYPE, payloadType);
if (this.hostName != null) {
metadata.put(KEY_OF_PROXY_GRPC_FORWARD_HOST, this.hostName);
}
if (this.port > 0) {
metadata.put(KEY_OF_PROXY_GRPC_FORWARD_PORT, String.valueOf(this.port));
}
return Server.ServerRequest.newBuilder()
.putAllMetadata(metadata)
.setSql(sql)
.addAllSegments(segments)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public void testDefaults()
.setAggregationPushdownEnabled(true)
.setCountDistinctPushdownEnabled(true)
.setGrpcEnabled(true)
.setProxyEnabled(false)
.setTargetSegmentPageSize(DataSize.of(1, MEGABYTE)));
}

Expand All @@ -67,6 +68,7 @@ public void testExplicitPropertyMappings()
.put("pinot.aggregation-pushdown.enabled", "false")
.put("pinot.count-distinct-pushdown.enabled", "false")
.put("pinot.grpc.enabled", "false")
.put("pinot.proxy.enabled", "true")
.put("pinot.target-segment-page-size", "2MB")
.buildOrThrow();

Expand All @@ -84,6 +86,7 @@ public void testExplicitPropertyMappings()
.setAggregationPushdownEnabled(false)
.setCountDistinctPushdownEnabled(false)
.setGrpcEnabled(false)
.setProxyEnabled(true)
.setTargetSegmentPageSize(DataSize.of(2, MEGABYTE));

ConfigAssertions.assertFullMapping(properties, expected);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ public void testDefaults()
.setMaxRowsPerSplitForSegmentQueries(Integer.MAX_VALUE - 1)
.setGrpcPort(8090)
.setUsePlainText(true)
.setMaxInboundMessageSize(DataSize.ofBytes(DEFAULT_MAX_INBOUND_MESSAGE_BYTES_SIZE)));
.setMaxInboundMessageSize(DataSize.ofBytes(DEFAULT_MAX_INBOUND_MESSAGE_BYTES_SIZE))
.setProxyUri(null));
}

@Test
Expand All @@ -44,12 +45,14 @@ public void testExplicitPropertyMappings()
.put("pinot.grpc.port", "8091")
.put("pinot.grpc.use-plain-text", "false")
.put("pinot.grpc.max-inbound-message-size", String.valueOf(DataSize.ofBytes(1)))
.put("pinot.grpc.proxy-uri", "my-pinot-proxy:8094")
.buildOrThrow();
PinotGrpcServerQueryClientConfig expected = new PinotGrpcServerQueryClientConfig()
.setMaxRowsPerSplitForSegmentQueries(10)
.setGrpcPort(8091)
.setUsePlainText(false)
.setMaxInboundMessageSize(DataSize.ofBytes(1));
.setMaxInboundMessageSize(DataSize.ofBytes(1))
.setProxyUri("my-pinot-proxy:8094");
ConfigAssertions.assertFullMapping(properties, expected);
}
}

0 comments on commit 4ae6fcd

Please sign in to comment.