Skip to content

Commit

Permalink
Upgrade Pinot libraries to 0.11.0
Browse files Browse the repository at this point in the history
  • Loading branch information
elonazoulay authored and martint committed Oct 8, 2022
1 parent 4302296 commit 76395ef
Show file tree
Hide file tree
Showing 12 changed files with 157 additions and 55 deletions.
96 changes: 90 additions & 6 deletions plugin/trino-pinot/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

<properties>
<air.main.basedir>${project.parent.basedir}</air.main.basedir>
<dep.pinot.version>0.10.0</dep.pinot.version>
<dep.pinot.version>0.11.0</dep.pinot.version>
<!--
Project's default for air.test.parallel is 'methods'. By design, 'instances' makes TestNG run tests from one class in a single thread.
As a side effect, it prevents TestNG from initializing multiple test instances upfront, which happens with 'methods'.
Expand All @@ -39,12 +39,17 @@
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.17.2</version>
<version>3.19.2</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.9</version>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.core</groupId>
<artifactId>jersey-common</artifactId>
<version>2.30</version>
<version>2.35</version>
</dependency>
<dependency>
<groupId>org.osgi</groupId>
Expand Down Expand Up @@ -166,7 +171,7 @@
<dependency>
<groupId>org.apache.helix</groupId>
<artifactId>helix-core</artifactId>
<version>0.9.8</version>
<version>1.0.4</version>
<exclusions>
<exclusion>
<groupId>commons-io</groupId>
Expand All @@ -184,6 +189,15 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</exclusion>
<!-- Conflicts with log4j-to-slf4j from trino-kafka -->
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
</exclusion>
</exclusions>
</dependency>

Expand Down Expand Up @@ -308,6 +322,10 @@
<groupId>com.google.code.findbugs</groupId>
<artifactId>annotations</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>

Expand Down Expand Up @@ -464,14 +482,14 @@
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.8</version>
<version>3.11</version>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>4.4.11</version>
<version>4.4.13</version>
<scope>runtime</scope>
</dependency>

Expand Down Expand Up @@ -568,6 +586,12 @@
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
Expand Down Expand Up @@ -635,6 +659,66 @@
</rules>
</configuration>
</plugin>
<plugin>
<groupId>org.basepom.maven</groupId>
<artifactId>duplicate-finder-maven-plugin</artifactId>
<configuration>
<ignoredDependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-classes-epoll</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
</dependency>
<dependency>
<groupId>jakarta.validation</groupId>
<artifactId>jakarta.validation-api</artifactId>
</dependency>
<dependency>
<groupId>javax.validation</groupId>
<artifactId>validation-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.helix</groupId>
<artifactId>helix-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.helix</groupId>
<artifactId>helix-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.helix</groupId>
<artifactId>metrics-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.helix</groupId>
<artifactId>metadata-store-directory-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.helix</groupId>
<artifactId>zookeeper-api</artifactId>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl:2.17.1</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-jdk14</artifactId>
</dependency>
</ignoredDependencies>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.trino.plugin.pinot.PinotException;
import io.trino.plugin.pinot.PinotSplit;
import io.trino.spi.connector.ConnectorSession;
import org.apache.pinot.common.config.GrpcConfig;
import org.apache.pinot.common.proto.Server;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.utils.grpc.GrpcQueryClient;
Expand All @@ -43,9 +44,9 @@
import java.util.concurrent.ConcurrentHashMap;

import static java.util.Objects.requireNonNull;
import static org.apache.pinot.common.utils.grpc.GrpcQueryClient.Config.CONFIG_MAX_INBOUND_MESSAGE_BYTES_SIZE;
import static org.apache.pinot.common.utils.grpc.GrpcQueryClient.Config.CONFIG_USE_PLAIN_TEXT;
import static org.apache.pinot.common.utils.grpc.GrpcQueryClient.Config.GRPC_TLS_PREFIX;
import static org.apache.pinot.common.config.GrpcConfig.CONFIG_MAX_INBOUND_MESSAGE_BYTES_SIZE;
import static org.apache.pinot.common.config.GrpcConfig.CONFIG_USE_PLAIN_TEXT;
import static org.apache.pinot.common.config.GrpcConfig.GRPC_TLS_PREFIX;

public class PinotGrpcDataFetcher
implements PinotDataFetcher
Expand Down Expand Up @@ -154,13 +155,13 @@ public interface GrpcQueryClientFactory
public static class PlainTextGrpcQueryClientFactory
implements GrpcQueryClientFactory
{
private final GrpcQueryClient.Config config;
private final GrpcConfig config;

@Inject
public PlainTextGrpcQueryClientFactory(PinotGrpcServerQueryClientConfig grpcClientConfig)
{
requireNonNull(grpcClientConfig, "grpcClientConfig is null");
this.config = new GrpcQueryClient.Config(ImmutableMap.<String, Object>builder()
this.config = new GrpcConfig(ImmutableMap.<String, Object>builder()
.put(CONFIG_MAX_INBOUND_MESSAGE_BYTES_SIZE, String.valueOf(grpcClientConfig.getMaxInboundMessageSize().toBytes()))
.put(CONFIG_USE_PLAIN_TEXT, String.valueOf(grpcClientConfig.isUsePlainText()))
.buildOrThrow());
Expand All @@ -185,7 +186,7 @@ public static class TlsGrpcQueryClientFactory
private static final String TRUSTSTORE_PASSWORD = GRPC_TLS_PREFIX + "." + "truststore.password";
private static final String SSL_PROVIDER = GRPC_TLS_PREFIX + "." + "ssl.provider";

private final GrpcQueryClient.Config config;
private final GrpcConfig config;

@Inject
public TlsGrpcQueryClientFactory(PinotGrpcServerQueryClientTlsConfig tlsConfig)
Expand All @@ -203,7 +204,7 @@ public TlsGrpcQueryClientFactory(PinotGrpcServerQueryClientTlsConfig tlsConfig)
}
tlsConfigBuilder.put(SSL_PROVIDER, tlsConfig.getSslProvider());

this.config = new GrpcQueryClient.Config(tlsConfigBuilder.buildOrThrow());
this.config = new GrpcConfig(tlsConfigBuilder.buildOrThrow());
}

@Override
Expand All @@ -215,8 +216,6 @@ public GrpcQueryClient create(HostAndPort hostAndPort)

public static class PinotGrpcServerQueryClient
{
private static final CalciteSqlCompiler REQUEST_COMPILER = new CalciteSqlCompiler();

private final PinotHostMapper pinotHostMapper;
private final Map<HostAndPort, GrpcQueryClient> clientCache = new ConcurrentHashMap<>();
private final int grpcPort;
Expand All @@ -241,7 +240,7 @@ public Iterator<PinotDataTableWithSize> queryPinot(ConnectorSession session, Str
closer.register(queryClient::close);
return queryClient;
});
BrokerRequest brokerRequest = REQUEST_COMPILER.compileToBrokerRequest(query);
BrokerRequest brokerRequest = CalciteSqlCompiler.compileToBrokerRequest(query);
GrpcRequestBuilder requestBuilder = new GrpcRequestBuilder()
.setSql(query)
.setSegments(segments)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import io.airlift.configuration.Config;
import io.airlift.units.DataSize;

import static org.apache.pinot.common.utils.grpc.GrpcQueryClient.Config.DEFAULT_MAX_INBOUND_MESSAGE_BYTES_SIZE;
import static org.apache.pinot.common.config.GrpcConfig.DEFAULT_MAX_INBOUND_MESSAGE_BYTES_SIZE;

public class PinotGrpcServerQueryClientConfig
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import io.trino.plugin.pinot.PinotSplit;
import io.trino.spi.connector.ConnectorSession;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.metrics.PinotMetricUtils;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataTable;
Expand All @@ -31,6 +30,7 @@
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.core.transport.ServerResponse;
import org.apache.pinot.core.transport.ServerRoutingInstance;
import org.apache.pinot.spi.metrics.PinotMetricUtils;
import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
Expand Down Expand Up @@ -172,7 +172,6 @@ public int getRowLimit()

public static class PinotLegacyServerQueryClient
{
private static final CalciteSqlCompiler REQUEST_COMPILER = new CalciteSqlCompiler();
private static final String TRINO_HOST_PREFIX = "trino-pinot-master";

private final String trinoHostId;
Expand Down Expand Up @@ -212,7 +211,7 @@ public Iterator<PinotDataTableWithSize> queryPinot(ConnectorSession session, Str
// TODO: separate into offline and realtime methods
BrokerRequest brokerRequest;
try {
brokerRequest = REQUEST_COMPILER.compileToBrokerRequest(query);
brokerRequest = CalciteSqlCompiler.compileToBrokerRequest(query);
}
catch (SqlCompilationException e) {
throw new PinotException(PINOT_INVALID_PQL_GENERATED, Optional.of(query), format("Parsing error with on %s, Error = %s", serverHost, e.getMessage()), e);
Expand All @@ -229,7 +228,7 @@ public Iterator<PinotDataTableWithSize> queryPinot(ConnectorSession session, Str
AsyncQueryResponse asyncQueryResponse =
doWithRetries(pinotRetryCount, requestId -> queryRouter.submitQuery(requestId, rawTableName, offlineBrokerRequest, offlineRoutingTable, realtimeBrokerRequest, realtimeRoutingTable, connectionTimeoutInMillis));
try {
Map<ServerRoutingInstance, ServerResponse> response = asyncQueryResponse.getResponse();
Map<ServerRoutingInstance, ServerResponse> response = asyncQueryResponse.getFinalResponses();
ImmutableList.Builder<PinotDataTableWithSize> pinotDataTableWithSizeBuilder = ImmutableList.builder();
for (Map.Entry<ServerRoutingInstance, ServerResponse> entry : response.entrySet()) {
ServerResponse serverResponse = entry.getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
import org.apache.pinot.core.query.reduce.PostAggregationHandler;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.request.context.utils.BrokerRequestToQueryContextConverter;
import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.sql.parsers.CalciteSqlCompiler;

Expand Down Expand Up @@ -62,7 +62,6 @@

public final class DynamicTableBuilder
{
private static final CalciteSqlCompiler REQUEST_COMPILER = new CalciteSqlCompiler();
public static final String OFFLINE_SUFFIX = "_OFFLINE";
public static final String REALTIME_SUFFIX = "_REALTIME";
private static final Set<AggregationFunctionType> NON_NULL_ON_EMPTY_AGGREGATIONS = EnumSet.of(COUNT, DISTINCTCOUNT, DISTINCTCOUNTHLL);
Expand All @@ -77,9 +76,10 @@ public static DynamicTable buildFromPql(PinotMetadata pinotMetadata, SchemaTable
requireNonNull(schemaTableName, "schemaTableName is null");
requireNonNull(typeConverter, "typeConverter is null");
String query = schemaTableName.getTableName();
BrokerRequest request = REQUEST_COMPILER.compileToBrokerRequest(query);
BrokerRequest request = CalciteSqlCompiler.compileToBrokerRequest(query);
PinotQuery pinotQuery = request.getPinotQuery();
QueryContext queryContext = BrokerRequestToQueryContextConverter.convert(request);
QueryContext queryContext = QueryContextConverterUtils.getQueryContext(pinotQuery);

String tableName = request.getQuerySource().getTableName();
String trinoTableName = stripSuffix(tableName).toLowerCase(ENGLISH);
String pinotTableName = pinotClient.getPinotTableNameFromTrinoTableName(trinoTableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
import org.apache.pinot.common.function.TransformFunctionType;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.request.context.FunctionContext;
import org.apache.pinot.core.operator.transform.transformer.datetime.BaseDateTimeTransformer;
import org.apache.pinot.core.operator.transform.transformer.datetime.DateTimeTransformerFactory;
import org.apache.pinot.core.operator.transform.transformer.datetime.EpochToEpochTransformer;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.spi.data.DateTimeFormatSpec;

import java.util.HashMap;
import java.util.Iterator;
Expand Down Expand Up @@ -65,8 +67,6 @@
import static org.apache.pinot.core.operator.transform.transformer.timeunit.TimeUnitTransformerFactory.getTimeUnitTransformer;
import static org.apache.pinot.segment.spi.AggregationFunctionType.COUNT;
import static org.apache.pinot.segment.spi.AggregationFunctionType.getAggregationFunctionType;
import static org.apache.pinot.spi.data.DateTimeFormatSpec.validateFormat;
import static org.apache.pinot.spi.data.DateTimeGranularitySpec.validateGranularity;

public class PinotExpressionRewriter
{
Expand Down Expand Up @@ -177,13 +177,13 @@ public FunctionContext rewrite(FunctionContext object, Captures captures, Contex
ImmutableList.Builder<ExpressionContext> argumentsBuilder = ImmutableList.builder();
argumentsBuilder.add(rewriteExpression(object.getArguments().get(0), context));
String inputFormat = object.getArguments().get(1).getLiteral().toUpperCase(ENGLISH);
checkDateTimeFormatSpec(inputFormat);
argumentsBuilder.add(forLiteral(inputFormat));
String outputFormat = object.getArguments().get(2).getLiteral().toUpperCase(ENGLISH);
checkDateTimeFormatSpec(outputFormat);
argumentsBuilder.add(forLiteral(outputFormat));
String granularity = object.getArguments().get(3).getLiteral().toUpperCase(ENGLISH);
validateGranularity(granularity);
BaseDateTimeTransformer dateTimeTransformer = DateTimeTransformerFactory.getDateTimeTransformer(inputFormat, outputFormat, granularity);
// Even if the format is valid, make sure it is not a simple date format: format characters can be ambiguous due to lower casing
checkState(dateTimeTransformer instanceof EpochToEpochTransformer, "Unsupported date format: simple date format not supported");
argumentsBuilder.add(forLiteral(granularity));
return new FunctionContext(object.getType(), object.getFunctionName(), argumentsBuilder.build());
}
Expand Down Expand Up @@ -306,15 +306,6 @@ public FunctionContext rewrite(FunctionContext object, Captures captures, Contex
}
}

private static void checkDateTimeFormatSpec(String dateTimeFormat)
{
requireNonNull(dateTimeFormat, "dateTimeFormat is null");
validateFormat(dateTimeFormat);
// Even if the format is valid, make sure it is not a simple date format: format characters can be ambiguous due to lower casing
DateTimeFormatSpec dateTimeFormatSpec = new DateTimeFormatSpec(dateTimeFormat);
checkState(dateTimeFormatSpec.getSDFPattern() == null, "Unsupported date format: simple date format not supported");
}

private static void verifyIsIdentifierOrFunction(ExpressionContext expressionContext)
{
verify(expressionContext.getType() == IDENTIFIER || expressionContext.getType() == FUNCTION);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,16 @@ public static Pattern<FunctionContext> binaryFunction()
});
}

public static Property<FunctionContext, ?, String> transformFunctionName()
{
return Property.optionalProperty("transformFunctionType", functionContext -> {
if (functionContext.getType() == TRANSFORM) {
return Optional.of(functionContext.getFunctionName());
}
return Optional.empty();
});
}

// AggregationFunction Properties
public static Property<FunctionContext, ?, AggregationFunctionType> aggregationFunctionType()
{
Expand Down
Loading

0 comments on commit 76395ef

Please sign in to comment.