From 041e6309bab7e8fcad173d6e92db7988f2c123d6 Mon Sep 17 00:00:00 2001 From: Elon Azoulay Date: Mon, 7 Feb 2022 16:44:30 -0800 Subject: [PATCH] Fix handling of complex aggregate expressions in Pinot passthrough queries --- .../pinot/query/DynamicTableBuilder.java | 127 +++++++++++++++-- .../AbstractPinotIntegrationSmokeTest.java | 129 +++++++++++++++++- ...licate_values_in_columns_realtimeSpec.json | 45 ++++++ .../duplicate_values_in_columns_schema.json | 41 ++++++ 4 files changed, 331 insertions(+), 11 deletions(-) create mode 100644 plugin/trino-pinot/src/test/resources/duplicate_values_in_columns_realtimeSpec.json create mode 100644 plugin/trino-pinot/src/test/resources/duplicate_values_in_columns_schema.json diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/DynamicTableBuilder.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/DynamicTableBuilder.java index 28964f347adf..024aa74b1b1f 100755 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/DynamicTableBuilder.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/DynamicTableBuilder.java @@ -30,19 +30,24 @@ import org.apache.pinot.common.request.context.OrderByExpressionContext; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.core.query.aggregation.function.AggregationFunction; +import org.apache.pinot.core.query.reduce.PostAggregationHandler; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.core.query.request.context.utils.BrokerRequestToQueryContextConverter; +import org.apache.pinot.segment.spi.AggregationFunctionType; import org.apache.pinot.sql.parsers.CalciteSqlCompiler; +import java.util.EnumSet; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.OptionalLong; +import java.util.Set; import static com.google.common.base.Preconditions.checkState; import static com.google.common.collect.ImmutableList.toImmutableList; import static io.trino.plugin.pinot.PinotColumnHandle.fromNonAggregateColumnHandle; import static io.trino.plugin.pinot.PinotColumnHandle.getTrinoTypeFromPinotType; +import static io.trino.plugin.pinot.PinotErrorCode.PINOT_EXCEPTION; import static io.trino.plugin.pinot.PinotErrorCode.PINOT_UNSUPPORTED_COLUMN_TYPE; import static io.trino.plugin.pinot.query.PinotExpressionRewriter.rewriteExpression; import static io.trino.plugin.pinot.query.PinotPatterns.WILDCARD; @@ -57,12 +62,17 @@ import static java.lang.String.format; import static java.util.Locale.ENGLISH; import static java.util.Objects.requireNonNull; +import static org.apache.pinot.segment.spi.AggregationFunctionType.COUNT; +import static org.apache.pinot.segment.spi.AggregationFunctionType.DISTINCTCOUNT; +import static org.apache.pinot.segment.spi.AggregationFunctionType.DISTINCTCOUNTHLL; +import static org.apache.pinot.segment.spi.AggregationFunctionType.getAggregationFunctionType; public final class DynamicTableBuilder { private static final CalciteSqlCompiler REQUEST_COMPILER = new CalciteSqlCompiler(); public static final String OFFLINE_SUFFIX = "_OFFLINE"; public static final String REALTIME_SUFFIX = "_REALTIME"; + private static final Set NON_NULL_ON_EMPTY_AGGREGATIONS = EnumSet.of(COUNT, DISTINCTCOUNT, DISTINCTCOUNTHLL); private DynamicTableBuilder() { @@ -84,14 +94,12 @@ public static DynamicTable buildFromPql(PinotMetadata pinotMetadata, SchemaTable PinotTypeResolver pinotTypeResolver = new PinotTypeResolver(pinotClient, pinotTableName); List selectColumns = ImmutableList.of(); - ImmutableMap.Builder aggregateTypesBuilder = ImmutableMap.builder(); + Map aggregateTypes = ImmutableMap.of(); if (queryContext.getAggregationFunctions() != null) { checkState(queryContext.getAggregationFunctions().length > 0, "Aggregation Functions is empty"); - for (AggregationFunction aggregationFunction : queryContext.getAggregationFunctions()) { - aggregateTypesBuilder.put(aggregationFunction.getResultColumnName(), toTrinoType(aggregationFunction.getFinalResultColumnType())); - } + aggregateTypes = getAggregateTypes(schemaTableName, queryContext, columnHandles); } - Map aggregateTypes = aggregateTypesBuilder.buildOrThrow(); + if (queryContext.getSelectExpressions() != null) { checkState(!queryContext.getSelectExpressions().isEmpty(), "Pinot selections is empty"); selectColumns = getPinotColumns(schemaTableName, queryContext.getSelectExpressions(), queryContext.getAliasList(), columnHandles, pinotTypeResolver, aggregateTypes); @@ -150,7 +158,7 @@ private static Type toTrinoType(DataSchema.ColumnDataType columnDataType) throw new PinotException(PINOT_UNSUPPORTED_COLUMN_TYPE, Optional.empty(), "Unsupported column data type: " + columnDataType); } - private static List getPinotColumns(SchemaTableName schemaTableName, List expressions, List aliases, Map columnHandles, PinotTypeResolver pinotTypeResolver, Map aggregateTypes) + private static List getPinotColumns(SchemaTableName schemaTableName, List expressions, List aliases, Map columnHandles, PinotTypeResolver pinotTypeResolver, Map aggregateTypes) { ImmutableList.Builder pinotColumnsBuilder = ImmutableList.builder(); for (int index = 0; index < expressions.size(); index++) { @@ -170,22 +178,24 @@ private static List getPinotColumns(SchemaTableName schemaTab return pinotColumnsBuilder.build(); } - private static PinotColumnHandle getPinotColumnHandle(SchemaTableName schemaTableName, ExpressionContext expressionContext, Optional alias, Map columnHandles, PinotTypeResolver pinotTypeResolver, Map aggregateTypes) + private static PinotColumnHandle getPinotColumnHandle(SchemaTableName schemaTableName, ExpressionContext expressionContext, Optional alias, Map columnHandles, PinotTypeResolver pinotTypeResolver, Map aggregateTypes) { ExpressionContext rewritten = rewriteExpression(schemaTableName, expressionContext, columnHandles); // If there is no alias, pinot autogenerates the column name: String columnName = rewritten.toString(); String pinotExpression = formatExpression(schemaTableName, rewritten); Type trinoType; - boolean isAggregate = isAggregate(rewritten); + boolean isAggregate = hasAggregate(rewritten); if (isAggregate) { - trinoType = requireNonNull(aggregateTypes.get(columnName.toLowerCase(ENGLISH)), format("Unexpected aggregate expression: '%s'", rewritten)); + trinoType = requireNonNull(aggregateTypes.get(columnName).getTrinoType(), format("Unexpected aggregate expression: '%s'", rewritten)); + // For aggregation queries, the column name is set by the schema returned from PostAggregationHandler, see getAggregateTypes + columnName = aggregateTypes.get(columnName).getPinotColumnName(); } else { trinoType = getTrinoTypeFromPinotType(pinotTypeResolver.resolveExpressionType(rewritten, schemaTableName, columnHandles)); } - return new PinotColumnHandle(alias.orElse(columnName), trinoType, pinotExpression, alias.isPresent(), isAggregate, true, Optional.empty(), Optional.empty()); + return new PinotColumnHandle(alias.orElse(columnName), trinoType, pinotExpression, alias.isPresent(), isAggregate, isReturnNullOnEmptyGroup(expressionContext), Optional.empty(), Optional.empty()); } private static Optional getAlias(List aliases, int index) @@ -202,6 +212,81 @@ private static boolean isAggregate(ExpressionContext expressionContext) return expressionContext.getType() == ExpressionContext.Type.FUNCTION && expressionContext.getFunction().getType() == FunctionContext.Type.AGGREGATION; } + private static boolean hasAggregate(ExpressionContext expressionContext) + { + switch (expressionContext.getType()) { + case IDENTIFIER: + case LITERAL: + return false; + case FUNCTION: + if (isAggregate(expressionContext)) { + return true; + } + for (ExpressionContext argument : expressionContext.getFunction().getArguments()) { + if (hasAggregate(argument)) { + return true; + } + } + return false; + } + throw new PinotException(PINOT_EXCEPTION, Optional.empty(), format("Unsupported expression type '%s'", expressionContext.getType())); + } + + private static Map getAggregateTypes(SchemaTableName schemaTableName, QueryContext queryContext, Map columnHandles) + { + // A mapping from pinot expression to the returned pinot column name and trino type + // Note: the column name is set by the PostAggregationHandler + List aggregateColumnExpressions = queryContext.getSelectExpressions().stream() + .filter(DynamicTableBuilder::hasAggregate) + .collect(toImmutableList()); + queryContext = new QueryContext.Builder() + .setAliasList(queryContext.getAliasList()) + .setSelectExpressions(aggregateColumnExpressions) + .build(); + DataSchema preAggregationSchema = getPreAggregationDataSchema(queryContext); + PostAggregationHandler postAggregationHandler = new PostAggregationHandler(queryContext, preAggregationSchema); + DataSchema postAggregtionSchema = postAggregationHandler.getResultDataSchema(); + ImmutableMap.Builder aggregationTypesBuilder = ImmutableMap.builder(); + for (int index = 0; index < postAggregtionSchema.size(); index++) { + aggregationTypesBuilder.put( + // ExpressionContext#toString performs quoting of literals + // Quoting of identifiers is not done to match the corresponding column name in the ResultTable returned from Pinot. Quoting will be done by `DynamicTablePqlExtractor`. + rewriteExpression(schemaTableName, + aggregateColumnExpressions.get(index), + columnHandles).toString(), + new PinotColumnNameAndTrinoType( + postAggregtionSchema.getColumnName(index), + toTrinoType(postAggregtionSchema.getColumnDataType(index)))); + } + return aggregationTypesBuilder.buildOrThrow(); + } + + // Extracted from org.apache.pinot.core.query.reduce.AggregationDataTableReducer + private static DataSchema getPreAggregationDataSchema(QueryContext queryContext) + { + AggregationFunction[] aggregationFunctions = queryContext.getAggregationFunctions(); + int numAggregationFunctions = aggregationFunctions.length; + String[] columnNames = new String[numAggregationFunctions]; + DataSchema.ColumnDataType[] columnDataTypes = new DataSchema.ColumnDataType[numAggregationFunctions]; + for (int i = 0; i < numAggregationFunctions; i++) { + AggregationFunction aggregationFunction = aggregationFunctions[i]; + columnNames[i] = aggregationFunction.getResultColumnName(); + columnDataTypes[i] = aggregationFunction.getFinalResultColumnType(); + } + return new DataSchema(columnNames, columnDataTypes); + } + + // To keep consistent behavior with pushed down aggregates, only return non null on an empty group + // if the top level function is in NON_NULL_ON_EMPTY_AGGREGATIONS. + // For all other cases, keep the same behavior as Pinot, since likely the same results are expected. + private static boolean isReturnNullOnEmptyGroup(ExpressionContext expressionContext) + { + if (isAggregate(expressionContext)) { + return !NON_NULL_ON_EMPTY_AGGREGATIONS.contains(getAggregationFunctionType(expressionContext.getFunction().getFunctionName())); + } + return true; + } + private static OptionalLong getOffset(QueryContext queryContext) { if (queryContext.getOffset() > 0) { @@ -239,4 +324,26 @@ else if (tableName.toUpperCase(ENGLISH).endsWith(REALTIME_SUFFIX)) { return Optional.empty(); } } + + private static class PinotColumnNameAndTrinoType + { + private final String pinotColumnName; + private final Type trinoType; + + public PinotColumnNameAndTrinoType(String pinotColumnName, Type trinoType) + { + this.pinotColumnName = requireNonNull(pinotColumnName, "pinotColumnName is null"); + this.trinoType = requireNonNull(trinoType, "trinoType is null"); + } + + public String getPinotColumnName() + { + return pinotColumnName; + } + + public Type getTrinoType() + { + return trinoType; + } + } } diff --git a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/AbstractPinotIntegrationSmokeTest.java b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/AbstractPinotIntegrationSmokeTest.java index bddc8863fe53..5ffcd20763e5 100644 --- a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/AbstractPinotIntegrationSmokeTest.java +++ b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/AbstractPinotIntegrationSmokeTest.java @@ -82,6 +82,7 @@ public abstract class AbstractPinotIntegrationSmokeTest private static final String JSON_TABLE = "my_table"; private static final String RESERVED_KEYWORD_TABLE = "reserved_keyword"; private static final String QUOTES_IN_COLUMN_NAME_TABLE = "quotes_in_column_name"; + private static final String DUPLICATE_VALUES_IN_COLUMNS_TABLE = "duplicate_values_in_columns"; // Use a recent value for updated_at to ensure Pinot doesn't clean up records older than retentionTimeValue as defined in the table specs private static final Instant initialUpdatedAt = Instant.now().minus(Duration.ofDays(1)).truncatedTo(SECONDS); // Use a fixed instant for testing date time functions @@ -296,6 +297,69 @@ protected QueryRunner createQueryRunner() pinot.createSchema(getClass().getClassLoader().getResourceAsStream("quotes_in_column_name_schema.json"), QUOTES_IN_COLUMN_NAME_TABLE); pinot.addRealTimeTable(getClass().getClassLoader().getResourceAsStream("quotes_in_column_name_realtimeSpec.json"), QUOTES_IN_COLUMN_NAME_TABLE); + // Create a table having multiple columns with duplicate values + kafka.createTopic(DUPLICATE_VALUES_IN_COLUMNS_TABLE); + Schema duplicateValuesInColumnsAvroSchema = SchemaBuilder.record(DUPLICATE_VALUES_IN_COLUMNS_TABLE).fields() + .name("dim_col").type().optional().longType() + .name("another_dim_col").type().optional().longType() + .name("string_col").type().optional().stringType() + .name("another_string_col").type().optional().stringType() + .name("metric_col1").type().optional().longType() + .name("metric_col2").type().optional().longType() + .name("updated_at").type().longType().noDefault() + .endRecord(); + + ImmutableList.Builder> duplicateValuesInColumnsRecordsBuilder = ImmutableList.builder(); + duplicateValuesInColumnsRecordsBuilder.add(new ProducerRecord<>(DUPLICATE_VALUES_IN_COLUMNS_TABLE, "key0", new GenericRecordBuilder(duplicateValuesInColumnsAvroSchema) + .set("dim_col", 1000L) + .set("another_dim_col", 1000L) + .set("string_col", "string1") + .set("another_string_col", "string1") + .set("metric_col1", 10L) + .set("metric_col2", 20L) + .set("updated_at", initialUpdatedAt.plusMillis(1000).toEpochMilli()) + .build())); + duplicateValuesInColumnsRecordsBuilder.add(new ProducerRecord<>(DUPLICATE_VALUES_IN_COLUMNS_TABLE, "key1", new GenericRecordBuilder(duplicateValuesInColumnsAvroSchema) + .set("dim_col", 2000L) + .set("another_dim_col", 2000L) + .set("string_col", "string1") + .set("another_string_col", "string1") + .set("metric_col1", 100L) + .set("metric_col2", 200L) + .set("updated_at", initialUpdatedAt.plusMillis(2000).toEpochMilli()) + .build())); + duplicateValuesInColumnsRecordsBuilder.add(new ProducerRecord<>(DUPLICATE_VALUES_IN_COLUMNS_TABLE, "key2", new GenericRecordBuilder(duplicateValuesInColumnsAvroSchema) + .set("dim_col", 3000L) + .set("another_dim_col", 3000L) + .set("string_col", "string1") + .set("another_string_col", "another_string1") + .set("metric_col1", 1000L) + .set("metric_col2", 2000L) + .set("updated_at", initialUpdatedAt.plusMillis(3000).toEpochMilli()) + .build())); + duplicateValuesInColumnsRecordsBuilder.add(new ProducerRecord<>(DUPLICATE_VALUES_IN_COLUMNS_TABLE, "key1", new GenericRecordBuilder(duplicateValuesInColumnsAvroSchema) + .set("dim_col", 4000L) + .set("another_dim_col", 4000L) + .set("string_col", "string2") + .set("another_string_col", "another_string2") + .set("metric_col1", 100L) + .set("metric_col2", 200L) + .set("updated_at", initialUpdatedAt.plusMillis(4000).toEpochMilli()) + .build())); + duplicateValuesInColumnsRecordsBuilder.add(new ProducerRecord<>(DUPLICATE_VALUES_IN_COLUMNS_TABLE, "key2", new GenericRecordBuilder(duplicateValuesInColumnsAvroSchema) + .set("dim_col", 4000L) + .set("another_dim_col", 4001L) + .set("string_col", "string2") + .set("another_string_col", "string2") + .set("metric_col1", 1000L) + .set("metric_col2", 2000L) + .set("updated_at", initialUpdatedAt.plusMillis(5000).toEpochMilli()) + .build())); + + kafka.sendMessages(duplicateValuesInColumnsRecordsBuilder.build().stream(), schemaRegistryAwareProducer(kafka)); + pinot.createSchema(getClass().getClassLoader().getResourceAsStream("duplicate_values_in_columns_schema.json"), DUPLICATE_VALUES_IN_COLUMNS_TABLE); + pinot.addRealTimeTable(getClass().getClassLoader().getResourceAsStream("duplicate_values_in_columns_realtimeSpec.json"), DUPLICATE_VALUES_IN_COLUMNS_TABLE); + return PinotQueryRunner.createPinotQueryRunner( ImmutableMap.of(), pinotProperties(pinot), @@ -1107,7 +1171,14 @@ public void testAggregationPushdown() " MIN(long_col), MAX(long_col), AVG(long_col), SUM(long_col)," + " MIN(float_col), MAX(float_col), AVG(float_col), SUM(float_col)," + " MIN(double_col), MAX(double_col), AVG(double_col), SUM(double_col)" + - " FROM " + ALL_TYPES_TABLE + " WHERE long_col > 4147483649")).isFullyPushedDown(); + " FROM " + ALL_TYPES_TABLE + " WHERE long_col > 4147483649")) + .isFullyPushedDown(); + + // Ensure that isNullOnEmptyGroup is handled correctly for passthrough queries as well + assertThat(query("SELECT \"count(*)\", \"distinctcounthll(string_col)\", \"distinctcount(string_col)\", \"sum(created_at_seconds)\", \"max(created_at_seconds)\"" + + " FROM \"SELECT count(*), distinctcounthll(string_col), distinctcount(string_col), sum(created_at_seconds), max(created_at_seconds) FROM " + DATE_TIME_FIELDS_TABLE + " WHERE created_at_seconds = 0\"")) + .matches("VALUES (BIGINT '0', BIGINT '0', INTEGER '0', CAST(NULL AS DOUBLE), CAST(NULL AS DOUBLE))") + .isFullyPushedDown(); // Test passthrough queries with no aggregates assertThat(query("SELECT string_col, COUNT(*)," + @@ -1765,4 +1836,60 @@ public void testLimitAndOffsetWithPushedDownAggregates() " (BIGINT '-3147483640', VARCHAR 'string_8400')") .isFullyPushedDown(); } + + @Test + public void testAggregatePassthroughQueriesWithExpressions() + { + assertThat(query("SELECT string_col, sum_metric_col1, count_dup_string_col, ratio_metric_col" + + " FROM \"SELECT string_col, SUM(metric_col1) AS sum_metric_col1, COUNT(DISTINCT another_string_col) AS count_dup_string_col," + + " (SUM(metric_col1) - SUM(metric_col2)) / SUM(metric_col1) AS ratio_metric_col" + + " FROM duplicate_values_in_columns WHERE dim_col = another_dim_col" + + " GROUP BY string_col" + + " ORDER BY string_col\"")) + .matches("VALUES (VARCHAR 'string1', DOUBLE '1110.0', 2, DOUBLE '-1.0')," + + " (VARCHAR 'string2', DOUBLE '100.0', 1, DOUBLE '-1.0')"); + + assertThat(query("SELECT string_col, sum_metric_col1, count_dup_string_col, ratio_metric_col" + + " FROM \"SELECT string_col, SUM(metric_col1) AS sum_metric_col1," + + " COUNT(DISTINCT another_string_col) AS count_dup_string_col," + + " (SUM(metric_col1) - SUM(metric_col2)) / SUM(metric_col1) AS ratio_metric_col" + + " FROM duplicate_values_in_columns WHERE dim_col != another_dim_col" + + " GROUP BY string_col" + + " ORDER BY string_col\"")) + .matches("VALUES (VARCHAR 'string2', DOUBLE '1000.0', 1, DOUBLE '-1.0')"); + + assertThat(query("SELECT DISTINCT string_col, another_string_col" + + " FROM \"SELECT string_col, another_string_col" + + " FROM duplicate_values_in_columns WHERE dim_col = another_dim_col\"")) + .matches("VALUES (VARCHAR 'string1', VARCHAR 'string1')," + + " (VARCHAR 'string1', VARCHAR 'another_string1')," + + " (VARCHAR 'string2', VARCHAR 'another_string2')"); + + assertThat(query("SELECT string_col, sum_metric_col1" + + " FROM \"SELECT string_col," + + " SUM(CASE WHEN dim_col = another_dim_col THEN metric_col1 ELSE 0 END) AS sum_metric_col1" + + " FROM duplicate_values_in_columns GROUP BY string_col ORDER BY string_col\"")) + .matches("VALUES (VARCHAR 'string1', DOUBLE '1110.0')," + + " (VARCHAR 'string2', DOUBLE '100.0')"); + + assertThat(query("SELECT \"percentile(int_col, 90.0)\"" + + " FROM \"SELECT percentile(int_col, 90) FROM " + ALL_TYPES_TABLE + "\"")) + .matches("VALUES (DOUBLE '56.0')"); + + assertThat(query("SELECT bool_col, \"percentile(int_col, 90.0)\"" + + " FROM \"SELECT bool_col, percentile(int_col, 90) FROM " + ALL_TYPES_TABLE + " GROUP BY bool_col\"")) + .matches("VALUES (true, DOUBLE '56.0')," + + " (false, DOUBLE '0.0')"); + + assertThat(query("SELECT \"sqrt(percentile(sqrt(int_col),'26.457513110645905'))\"" + + " FROM \"SELECT sqrt(percentile(sqrt(int_col), sqrt(700))) FROM " + ALL_TYPES_TABLE + "\"")) + .matches("VALUES (DOUBLE '2.7108060108295344')"); + + assertThat(query("SELECT int_col, \"sqrt(percentile(sqrt(int_col),'26.457513110645905'))\"" + + " FROM \"SELECT int_col, sqrt(percentile(sqrt(int_col), sqrt(700))) FROM " + ALL_TYPES_TABLE + " GROUP BY int_col\"")) + .matches("VALUES (54, DOUBLE '2.7108060108295344')," + + " (55, DOUBLE '2.7232698153315003')," + + " (56, DOUBLE '2.7355647997347607')," + + " (0, DOUBLE '0.0')"); + } } diff --git a/plugin/trino-pinot/src/test/resources/duplicate_values_in_columns_realtimeSpec.json b/plugin/trino-pinot/src/test/resources/duplicate_values_in_columns_realtimeSpec.json new file mode 100644 index 000000000000..d97fb7d57756 --- /dev/null +++ b/plugin/trino-pinot/src/test/resources/duplicate_values_in_columns_realtimeSpec.json @@ -0,0 +1,45 @@ +{ + "tableName": "duplicate_values_in_columns", + "tableType": "REALTIME", + "segmentsConfig": { + "timeColumnName": "updated_at_seconds", + "timeType": "SECONDS", + "retentionTimeUnit": "DAYS", + "retentionTimeValue": "365", + "segmentPushType": "APPEND", + "segmentPushFrequency": "daily", + "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy", + "schemaName": "duplicate_values_in_columns", + "replicasPerPartition": "1" + }, + "tenants": { + "broker": "DefaultTenant", + "server": "DefaultTenant" + }, + "tableIndexConfig": { + "loadMode": "MMAP", + "invertedIndexColumns": [], + "sortedColumn": ["updated_at_seconds"], + "starTreeIndexConfigs": [], + "nullHandlingEnabled": "true", + "streamConfigs": { + "streamType": "kafka", + "stream.kafka.consumer.type": "LowLevel", + "stream.kafka.topic.name": "duplicate_values_in_columns", + "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder", + "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory", + "stream.kafka.decoder.prop.schema.registry.rest.url": "http://schema-registry:8081", + "stream.kafka.zk.broker.url": "zookeeper:2181/", + "stream.kafka.broker.list": "kafka:9092", + "realtime.segment.flush.threshold.time": "1m", + "realtime.segment.flush.threshold.size": "0", + "realtime.segment.flush.desired.size": "1M", + "isolation.level": "read_committed", + "stream.kafka.consumer.prop.auto.offset.reset": "smallest", + "stream.kafka.consumer.prop.group.id": "pinot_duplicate_values_in_columns" + } + }, + "metadata": { + "customConfigs": {} + } +} diff --git a/plugin/trino-pinot/src/test/resources/duplicate_values_in_columns_schema.json b/plugin/trino-pinot/src/test/resources/duplicate_values_in_columns_schema.json new file mode 100644 index 000000000000..6a4ca08cd280 --- /dev/null +++ b/plugin/trino-pinot/src/test/resources/duplicate_values_in_columns_schema.json @@ -0,0 +1,41 @@ +{ + "schemaName": "duplicate_values_in_columns", + "dimensionFieldSpecs": [ + { + "name": "dim_col", + "dataType": "LONG" + }, + { + "name": "another_dim_col", + "dataType": "LONG" + }, + { + "name": "string_col", + "dataType": "STRING" + }, + { + "name": "another_string_col", + "dataType": "STRING" + } + ], + "metricFieldSpecs": [ + { + "name": "metric_col1", + "dataType": "LONG" + }, + { + "name": "metric_col2", + "dataType": "LONG" + } + ], + "dateTimeFieldSpecs": [ + { + "name": "updated_at_seconds", + "dataType": "LONG", + "defaultNullValue" : 0, + "format": "1:SECONDS:EPOCH", + "transformFunction": "toEpochSeconds(updated_at)", + "granularity" : "1:SECONDS" + } + ] +}