Skip to content

Commit

Permalink
Upgrade Pinot libraries to 0.12.1
Browse files Browse the repository at this point in the history
  • Loading branch information
elonazoulay committed Mar 22, 2023
1 parent 842b7b2 commit f8395ce
Show file tree
Hide file tree
Showing 13 changed files with 99 additions and 46 deletions.
2 changes: 1 addition & 1 deletion docs/src/main/sphinx/connector/pinot.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ Requirements

To connect to Pinot, you need:

* Pinot 0.10.0 or higher.
* Pinot 0.11.0 or higher.
* Network access from the Trino coordinator and workers to the Pinot controller
nodes. Port 8098 is the default port.

Expand Down
2 changes: 1 addition & 1 deletion plugin/trino-pinot/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

<properties>
<air.main.basedir>${project.parent.basedir}</air.main.basedir>
<dep.pinot.version>0.11.0</dep.pinot.version>
<dep.pinot.version>0.12.1</dep.pinot.version>
<!--
Project's default for air.test.parallel is 'methods'. By design, 'instances' makes TestNG run tests from one class in a single thread.
As a side effect, it prevents TestNG from initializing multiple test instances upfront, which happens with 'methods'.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import io.trino.plugin.pinot.PinotException;
import io.trino.plugin.pinot.PinotSplit;
import io.trino.spi.connector.ConnectorSession;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.common.datatable.DataTable;

import java.util.List;
import java.util.Map;
Expand All @@ -27,7 +27,7 @@
import static io.trino.plugin.pinot.PinotErrorCode.PINOT_EXCEPTION;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static org.apache.pinot.common.utils.DataTable.EXCEPTION_METADATA_KEY;
import static org.apache.pinot.common.datatable.DataTable.EXCEPTION_METADATA_KEY;

public interface PinotDataFetcher
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
*/
package io.trino.plugin.pinot.client;

import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.common.datatable.DataTable;

public class PinotDataTableWithSize
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import io.trino.plugin.pinot.query.PinotProxyGrpcRequestBuilder;
import io.trino.spi.connector.ConnectorSession;
import org.apache.pinot.common.config.GrpcConfig;
import org.apache.pinot.common.datatable.DataTableFactory;
import org.apache.pinot.common.proto.Server;
import org.apache.pinot.common.utils.grpc.GrpcQueryClient;
import org.apache.pinot.core.common.datatable.DataTableFactory;
import org.apache.pinot.spi.utils.CommonConstants.Query.Response.MetadataKeys;
import org.apache.pinot.spi.utils.CommonConstants.Query.Response.ResponseType;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,17 @@
import io.trino.plugin.pinot.PinotSessionProperties;
import io.trino.plugin.pinot.PinotSplit;
import io.trino.spi.connector.ConnectorSession;
import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.core.transport.AsyncQueryResponse;
import org.apache.pinot.core.transport.QueryRouter;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.core.transport.ServerResponse;
import org.apache.pinot.core.transport.ServerRoutingInstance;
import org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.metrics.PinotMetricUtils;
import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
Expand Down Expand Up @@ -189,7 +191,7 @@ public PinotLegacyServerQueryClient(PinotHostMapper pinotHostMapper, PinotConfig
PinotMetricsRegistry registry = PinotMetricUtils.getPinotMetricsRegistry();
this.brokerMetrics = new BrokerMetrics(registry);
brokerMetrics.initializeGlobalMeters();
queryRouter = new QueryRouter(trinoHostId, brokerMetrics);
queryRouter = new QueryRouter(trinoHostId, brokerMetrics, new ServerRoutingStatsManager(new PinotConfiguration()));
}

private static String getDefaultTrinoId()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,20 @@
import static io.trino.plugin.pinot.query.PinotPatterns.transformFunction;
import static io.trino.plugin.pinot.query.PinotPatterns.transformFunctionType;
import static io.trino.plugin.pinot.query.PinotSqlFormatter.getColumnHandle;
import static io.trino.plugin.pinot.query.PinotTransformFunctionTypeResolver.getTransformFunctionType;
import static java.lang.String.format;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;
import static org.apache.pinot.common.function.TransformFunctionType.DATETIMECONVERT;
import static org.apache.pinot.common.function.TransformFunctionType.DATETRUNC;
import static org.apache.pinot.common.function.TransformFunctionType.TIMECONVERT;
import static org.apache.pinot.common.request.Literal.stringValue;
import static org.apache.pinot.common.request.context.ExpressionContext.Type.FUNCTION;
import static org.apache.pinot.common.request.context.ExpressionContext.Type.IDENTIFIER;
import static org.apache.pinot.common.request.context.ExpressionContext.Type.LITERAL;
import static org.apache.pinot.common.request.context.ExpressionContext.forFunction;
import static org.apache.pinot.common.request.context.ExpressionContext.forIdentifier;
import static org.apache.pinot.common.request.context.ExpressionContext.forLiteral;
import static org.apache.pinot.common.request.context.ExpressionContext.forLiteralContext;
import static org.apache.pinot.core.operator.transform.function.DateTruncTransformFunction.EXAMPLE_INVOCATION;
import static org.apache.pinot.core.operator.transform.transformer.timeunit.TimeUnitTransformerFactory.getTimeUnitTransformer;
import static org.apache.pinot.segment.spi.AggregationFunctionType.COUNT;
Expand Down Expand Up @@ -124,7 +126,7 @@ private static FunctionContext rewriteFunction(FunctionContext functionContext,
{
Optional<FunctionContext> result = Optional.empty();
if (functionContext.getType() == FunctionContext.Type.TRANSFORM) {
RewriteRule<FunctionContext> rule = FUNCTION_RULE_MAP.get(TransformFunctionType.getTransformFunctionType(functionContext.getFunctionName()));
RewriteRule<FunctionContext> rule = FUNCTION_RULE_MAP.get(getTransformFunctionType(functionContext).orElseThrow());
if (rule != null) {
result = applyRule(rule, functionContext, context);
}
Expand Down Expand Up @@ -176,15 +178,15 @@ public FunctionContext rewrite(FunctionContext object, Captures captures, Contex

ImmutableList.Builder<ExpressionContext> argumentsBuilder = ImmutableList.builder();
argumentsBuilder.add(rewriteExpression(object.getArguments().get(0), context));
String inputFormat = object.getArguments().get(1).getLiteral().toUpperCase(ENGLISH);
argumentsBuilder.add(forLiteral(inputFormat));
String outputFormat = object.getArguments().get(2).getLiteral().toUpperCase(ENGLISH);
argumentsBuilder.add(forLiteral(outputFormat));
String granularity = object.getArguments().get(3).getLiteral().toUpperCase(ENGLISH);
String inputFormat = object.getArguments().get(1).getLiteral().getValue().toString().toUpperCase(ENGLISH);
argumentsBuilder.add(forLiteralContext(stringValue(inputFormat)));
String outputFormat = object.getArguments().get(2).getLiteral().getValue().toString().toUpperCase(ENGLISH);
argumentsBuilder.add(forLiteralContext(stringValue(outputFormat)));
String granularity = object.getArguments().get(3).getLiteral().getValue().toString().toUpperCase(ENGLISH);
BaseDateTimeTransformer dateTimeTransformer = DateTimeTransformerFactory.getDateTimeTransformer(inputFormat, outputFormat, granularity);
// Even if the format is valid, make sure it is not a simple date format: format characters can be ambiguous due to lower casing
checkState(dateTimeTransformer instanceof EpochToEpochTransformer, "Unsupported date format: simple date format not supported");
argumentsBuilder.add(forLiteral(granularity));
argumentsBuilder.add(forLiteralContext(stringValue(granularity)));
return new FunctionContext(object.getType(), object.getFunctionName(), argumentsBuilder.build());
}
}
Expand All @@ -209,13 +211,13 @@ public FunctionContext rewrite(FunctionContext object, Captures captures, Contex

ImmutableList.Builder<ExpressionContext> argumentsBuilder = ImmutableList.builder();
argumentsBuilder.add(rewriteExpression(object.getArguments().get(0), context));
String inputTimeUnitArgument = object.getArguments().get(1).getLiteral().toUpperCase(ENGLISH);
String inputTimeUnitArgument = object.getArguments().get(1).getLiteral().getValue().toString().toUpperCase(ENGLISH);
TimeUnit inputTimeUnit = TimeUnit.valueOf(inputTimeUnitArgument);
String outputTimeUnitArgument = object.getArguments().get(2).getLiteral().toUpperCase(ENGLISH);
String outputTimeUnitArgument = object.getArguments().get(2).getLiteral().getValue().toString().toUpperCase(ENGLISH);
// Check that this is a valid time unit transform
getTimeUnitTransformer(inputTimeUnit, outputTimeUnitArgument);
argumentsBuilder.add(forLiteral(inputTimeUnitArgument));
argumentsBuilder.add(forLiteral(outputTimeUnitArgument));
argumentsBuilder.add(forLiteralContext(stringValue(inputTimeUnitArgument)));
argumentsBuilder.add(forLiteralContext(stringValue(outputTimeUnitArgument)));
return new FunctionContext(object.getType(), object.getFunctionName(), argumentsBuilder.build());
}
}
Expand All @@ -240,27 +242,27 @@ public FunctionContext rewrite(FunctionContext object, Captures captures, Contex
ImmutableList.Builder<ExpressionContext> argumentsBuilder = ImmutableList.builder();

checkState(arguments.get(0).getType() == LITERAL, "First argument must be a literal");
String unit = arguments.get(0).getLiteral().toLowerCase(ENGLISH);
argumentsBuilder.add(forLiteral(unit));
String unit = arguments.get(0).getLiteral().getValue().toString().toLowerCase(ENGLISH);
argumentsBuilder.add(forLiteralContext(stringValue(unit)));
verifyIsIdentifierOrFunction(object.getArguments().get(1));
ExpressionContext valueArgument = rewriteExpression(arguments.get(1), context);
argumentsBuilder.add(valueArgument);
if (arguments.size() >= 3) {
checkState(arguments.get(2).getType() == LITERAL, "Unexpected 3rd argument: '%s'", arguments.get(2));
String inputTimeUnitArgument = arguments.get(2).getLiteral().toUpperCase(ENGLISH);
String inputTimeUnitArgument = arguments.get(2).getLiteral().getValue().toString().toUpperCase(ENGLISH);
// Ensure this is a valid TimeUnit
TimeUnit inputTimeUnit = TimeUnit.valueOf(inputTimeUnitArgument);
argumentsBuilder.add(forLiteral(inputTimeUnit.name()));
argumentsBuilder.add(forLiteralContext(stringValue(inputTimeUnit.name())));
if (arguments.size() >= 4) {
checkState(arguments.get(3).getType() == LITERAL, "Unexpected 4th argument '%s'", arguments.get(3));
// Time zone is lower cased inside Pinot
argumentsBuilder.add(arguments.get(3));
if (arguments.size() >= 5) {
checkState(arguments.get(4).getType() == LITERAL, "Unexpected 5th argument: '%s'", arguments.get(4));
String outputTimeUnitArgument = arguments.get(4).getLiteral().toUpperCase(ENGLISH);
String outputTimeUnitArgument = arguments.get(4).getLiteral().getValue().toString().toUpperCase(ENGLISH);
// Ensure this is a valid TimeUnit
TimeUnit outputTimeUnit = TimeUnit.valueOf(outputTimeUnitArgument);
argumentsBuilder.add(forLiteral(outputTimeUnit.name()));
argumentsBuilder.add(forLiteralContext(stringValue(outputTimeUnit.name())));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import java.util.Optional;

import static io.trino.matching.Pattern.typeOf;
import static org.apache.pinot.common.function.TransformFunctionType.getTransformFunctionType;
import static io.trino.plugin.pinot.query.PinotTransformFunctionTypeResolver.getTransformFunctionType;
import static org.apache.pinot.common.request.context.ExpressionContext.Type.FUNCTION;
import static org.apache.pinot.common.request.context.ExpressionContext.Type.IDENTIFIER;
import static org.apache.pinot.common.request.context.FunctionContext.Type.AGGREGATION;
Expand Down Expand Up @@ -235,7 +235,7 @@ public static Pattern<FunctionContext> binaryFunction()
{
return Property.optionalProperty("transformFunctionType", functionContext -> {
if (functionContext.getType() == TRANSFORM) {
return Optional.of(getTransformFunctionType(functionContext.getFunctionName()));
return getTransformFunctionType(functionContext);
}
return Optional.empty();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import static io.trino.plugin.pinot.query.PinotPatterns.transformFunction;
import static io.trino.plugin.pinot.query.PinotPatterns.transformFunctionName;
import static io.trino.plugin.pinot.query.PinotPatterns.transformFunctionType;
import static io.trino.plugin.pinot.query.PinotTransformFunctionTypeResolver.getTransformFunctionType;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.joining;
Expand Down Expand Up @@ -198,7 +199,7 @@ private static String formatExpression(ExpressionContext expressionContext, Cont
{
switch (expressionContext.getType()) {
case LITERAL:
return singleQuoteValue(expressionContext.getLiteral());
return singleQuoteValue(expressionContext.getLiteral().getValue().toString());
case IDENTIFIER:
if (context.getColumnHandles().isPresent()) {
return quoteIdentifier(getColumnHandle(expressionContext.getIdentifier(), context.getSchemaTableName(), context.getColumnHandles().get()).getColumnName());
Expand All @@ -214,7 +215,7 @@ private static String formatFunction(FunctionContext functionContext, Context co
{
Optional<String> result = Optional.empty();
if (functionContext.getType() == FunctionContext.Type.TRANSFORM) {
Rule<FunctionContext> rule = FUNCTION_RULE_MAP.get(TransformFunctionType.getTransformFunctionType(functionContext.getFunctionName()));
Rule<FunctionContext> rule = FUNCTION_RULE_MAP.get(getTransformFunctionType(functionContext).orElseThrow());

if (rule != null) {
result = applyRule(rule, functionContext, context);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.pinot.query;

import org.apache.pinot.common.function.FunctionRegistry;
import org.apache.pinot.common.function.TransformFunctionType;
import org.apache.pinot.common.request.context.FunctionContext;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;
import static org.apache.pinot.common.function.TransformFunctionType.SCALAR;
import static org.apache.pinot.core.operator.transform.function.TransformFunctionFactory.canonicalize;

public final class PinotTransformFunctionTypeResolver
{
private PinotTransformFunctionTypeResolver() {}

private static final Map<String, TransformFunctionType> TRANSFORM_FUNCTION_TYPE_MAP;

static
{
Map<String, TransformFunctionType> builder = new HashMap<>();
for (TransformFunctionType transformFunctionType : TransformFunctionType.values()) {
for (String alias : transformFunctionType.getAliases()) {
TransformFunctionType previousValue = builder.put(canonicalize(alias), transformFunctionType);
checkState(previousValue == null || previousValue == transformFunctionType, "Duplicate key with different values for alias '%s', transform function type '%s' and previous value '%s'", canonicalize(alias), transformFunctionType, previousValue);
}
}
TRANSFORM_FUNCTION_TYPE_MAP = Map.copyOf(builder);
}

// Extracted from org.apache.pinot.core.operator.transform.function.TransformFunctionFactory::get
public static Optional<TransformFunctionType> getTransformFunctionType(FunctionContext function)
{
requireNonNull(function, "function is null");
String canonicalizedFunctionName = canonicalize(function.getFunctionName());
TransformFunctionType transformFunctionType = TRANSFORM_FUNCTION_TYPE_MAP.get(canonicalizedFunctionName);
if (transformFunctionType != null) {
return Optional.of(transformFunctionType);
}
if (FunctionRegistry.containsFunction(canonicalizedFunctionName)) {
return Optional.of(SCALAR);
}
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@
import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder;
import static io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG;
import static io.trino.plugin.pinot.PinotQueryRunner.createPinotQueryRunner;
import static io.trino.plugin.pinot.TestingPinotCluster.PINOT_LATEST_IMAGE_NAME;
import static io.trino.plugin.pinot.TestingPinotCluster.PINOT_PREVIOUS_IMAGE_NAME;
import static io.trino.spi.type.DoubleType.DOUBLE;
import static io.trino.spi.type.RealType.REAL;
Expand Down Expand Up @@ -142,11 +141,6 @@ protected String getPinotImageName()
return PINOT_PREVIOUS_IMAGE_NAME;
}

protected boolean isLatestVersion()
{
return getPinotImageName().equals(PINOT_LATEST_IMAGE_NAME);
}

@Override
protected QueryRunner createQueryRunner()
throws Exception
Expand Down Expand Up @@ -308,13 +302,6 @@ private void createAndPopulateTooManyRowsTable(TestingKafka kafka, TestingPinotC
.set("updatedAt", initialUpdatedAt.plusMillis(i * 1000).toEpochMilli())
.build()));
}
// For pinot 0.11.0+: rows with null time column values are ingested
// Only add a null row with a null time column for pinot < 0.11.0
if (!isLatestVersion()) {
// Add a null row, verify it was not ingested as pinot does not accept null time column values.
// The data is verified in testBrokerQueryWithTooManyRowsForSegmentQuery
tooManyRowsRecordsBuilder.add(new ProducerRecord<>(TOO_MANY_ROWS_TABLE, "key" + MAX_ROWS_PER_SPLIT_FOR_SEGMENT_QUERIES, new GenericRecordBuilder(tooManyRowsAvroSchema).build()));
}
kafka.sendMessages(tooManyRowsRecordsBuilder.build().stream(), schemaRegistryAwareProducer(kafka));
pinot.createSchema(getClass().getClassLoader().getResourceAsStream("too_many_rows_schema.json"), TOO_MANY_ROWS_TABLE);
pinot.addRealTimeTable(getClass().getClassLoader().getResourceAsStream("too_many_rows_realtimeSpec.json"), TOO_MANY_ROWS_TABLE);
Expand Down
Loading

0 comments on commit f8395ce

Please sign in to comment.