Skip to content

Commit

Permalink
Add pinot query options to query
Browse files Browse the repository at this point in the history
  • Loading branch information
naman-patel committed May 31, 2024
1 parent 99b9b37 commit cb25080
Show file tree
Hide file tree
Showing 11 changed files with 180 additions and 15 deletions.
5 changes: 3 additions & 2 deletions docs/src/main/sphinx/connector/pinot.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ This can be the ip or the FDQN, the url scheme (`http://`) is optional.
### General configuration properties

| Property name | Required | Description |
|--------------------------------------------------------|----------| ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
|--------------------------------------------------------|----------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `pinot.controller-urls` | Yes | A comma separated list of controller hosts. If Pinot is deployed via [Kubernetes](https://kubernetes.io/) this needs to point to the controller service endpoint. The Pinot broker and server must be accessible via DNS as Pinot returns hostnames and not IP addresses. |
| `pinot.broker-url` | No | A host and port of broker. If broker URL exposed by Pinot controller API is not accessible, this property can be used to specify the broker endpoint. Enabling this property will disable broker discovery. |
| `pinot.broker-url` | No | A host and port of broker. If broker URL exposed by Pinot controller API is not accessible, this property can be used to specify the broker endpoint. Enabling this property will disable broker discovery. |
| `pinot.connection-timeout` | No | Pinot connection timeout, default is `15s`. |
| `pinot.metadata-expiry` | No | Pinot metadata expiration time, default is `2m`. |
| `pinot.controller.authentication.type` | No | Pinot authentication method for controller requests. Allowed values are `NONE` and `PASSWORD` - defaults to `NONE` which is no authentication. |
Expand All @@ -55,6 +55,7 @@ This can be the ip or the FDQN, the url scheme (`http://`) is optional.
| `pinot.count-distinct-pushdown.enabled` | No | Push down count distinct queries to Pinot, default is `true`. |
| `pinot.target-segment-page-size` | No | Max allowed page size for segment query, default is `1MB`. |
| `pinot.proxy.enabled` | No | Use Pinot Proxy for controller and broker requests, default is `false`. |
| `pinot.query-options` | No | Query options to be included with pinot queries. Default is null. For a varchar / string option please add single quates(`'`) aroud the value. example:`enableNullHanding:true,minReplicas:2,somethingElse:'YES'` |

If `pinot.controller.authentication.type` is set to `PASSWORD` then both `pinot.controller.authentication.user` and
`pinot.controller.authentication.password` are required.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.airlift.units.MinDuration;
import io.trino.plugin.pinot.query.PinotQueryBuilder;
import jakarta.annotation.PostConstruct;
import jakarta.validation.constraints.AssertTrue;
import jakarta.validation.constraints.NotEmpty;
Expand Down Expand Up @@ -69,6 +70,7 @@ public class PinotConfig
private boolean countDistinctPushdownEnabled = true;
private boolean proxyEnabled;
private DataSize targetSegmentPageSize = DataSize.of(1, MEGABYTE);
private Optional<String> queryOptions = Optional.empty();

@NotEmpty(message = "pinot.controller-urls cannot be empty")
public List<URI> getControllerUrls()
Expand Down Expand Up @@ -261,6 +263,25 @@ public PinotConfig setTargetSegmentPageSize(DataSize targetSegmentPageSize)
return this;
}

public String getQueryOptions()
{
return queryOptions.orElse(null);
}

@Config("pinot.query-options")
@ConfigDescription("Comma separated list of query options. Each option should be in the format key:value. " +
"For example, enableNullHandling:true,skipUpsert:true,varcharOption:'value'")
public PinotConfig setQueryOptions(String options)
{
if (options == null) {
queryOptions = Optional.empty();
}
else {
queryOptions = PinotQueryBuilder.getQueryOptionsString(options); // validate the options (throws exception if invalid)
}
return this;
}

@PostConstruct
public void validate()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

import static io.trino.plugin.pinot.query.DynamicTablePqlExtractor.extractPql;
import static io.trino.plugin.pinot.query.PinotQueryBuilder.generatePql;
Expand Down Expand Up @@ -74,7 +75,8 @@ public ConnectorPageSource createPageSource(
handles.add((PinotColumnHandle) handle);
}
PinotTableHandle pinotTableHandle = (PinotTableHandle) tableHandle;
String query = generatePql(pinotTableHandle, handles, pinotSplit.getSuffix(), pinotSplit.getTimePredicate(), limitForSegmentQueries);
Optional<String> queryOptions = PinotSessionProperties.getQueryOptions(session);
String query = generatePql(pinotTableHandle, handles, pinotSplit.getSuffix(), pinotSplit.getTimePredicate(), limitForSegmentQueries, queryOptions);

switch (pinotSplit.getSplitType()) {
case SEGMENT:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@
import io.trino.spi.session.PropertyMetadata;

import java.util.List;
import java.util.Optional;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Verify.verify;
import static io.trino.plugin.base.session.PropertyMetadataUtil.durationProperty;
import static io.trino.spi.session.PropertyMetadata.booleanProperty;
import static io.trino.spi.session.PropertyMetadata.integerProperty;
import static io.trino.spi.session.PropertyMetadata.stringProperty;

public class PinotSessionProperties
{
Expand All @@ -37,6 +39,7 @@ public class PinotSessionProperties
private static final String SEGMENTS_PER_SPLIT = "segments_per_split";
private static final String AGGREGATION_PUSHDOWN_ENABLED = "aggregation_pushdown_enabled";
private static final String COUNT_DISTINCT_PUSHDOWN_ENABLED = "count_distinct_pushdown_enabled";
private static final String QUERY_OPTIONS = "query_options";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -84,6 +87,11 @@ public PinotSessionProperties(PinotConfig pinotConfig)
COUNT_DISTINCT_PUSHDOWN_ENABLED,
"Enable count distinct pushdown",
pinotConfig.isCountDistinctPushdownEnabled(),
false),
stringProperty(
QUERY_OPTIONS,
"Pinot query option in the format of key:value,key:value",
pinotConfig.getQueryOptions(),
false));
}

Expand Down Expand Up @@ -130,6 +138,11 @@ public static boolean isCountDistinctPushdownEnabled(ConnectorSession session)
return session.getProperty(COUNT_DISTINCT_PUSHDOWN_ENABLED, Boolean.class);
}

public static Optional<String> getQueryOptions(ConnectorSession session)
{
return Optional.ofNullable(session.getProperty(QUERY_OPTIONS, String.class));
}

public List<PropertyMetadata<?>> getSessionProperties()
{
return sessionProperties;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,7 @@ public static String extractPql(DynamicTable table, TupleDomain<ColumnHandle> tu
{
StringBuilder builder = new StringBuilder();
Map<String, String> queryOptions = table.queryOptions();
queryOptions.keySet().stream().sorted().forEach(
key -> builder
.append("SET ")
.append(key)
.append(" = ")
.append(format("'%s'", queryOptions.get(key)))
.append(";\n"));
PinotQueryBuilder.getQueryOptions(queryOptions).ifPresent(builder::append);
builder.append("SELECT ");
if (!table.projections().isEmpty()) {
builder.append(table.projections().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airlift.slice.Slice;
import io.trino.plugin.pinot.PinotColumnHandle;
import io.trino.plugin.pinot.PinotTableHandle;
Expand All @@ -36,8 +37,10 @@
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.stream.Collectors;

import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Strings.isNullOrEmpty;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Iterables.getOnlyElement;
Expand All @@ -53,10 +56,17 @@ private PinotQueryBuilder()
{
}

public static String generatePql(PinotTableHandle tableHandle, List<PinotColumnHandle> columnHandles, Optional<String> tableNameSuffix, Optional<String> timePredicate, int limitForSegmentQueries)
public static String generatePql(
PinotTableHandle tableHandle,
List<PinotColumnHandle> columnHandles,
Optional<String> tableNameSuffix,
Optional<String> timePredicate,
int limitForSegmentQueries,
Optional<String> queryOptions)
{
requireNonNull(tableHandle, "tableHandle is null");
StringBuilder pqlBuilder = new StringBuilder();
queryOptions.ifPresent(pqlBuilder::append);
List<String> quotedColumnNames;
if (columnHandles.isEmpty()) {
// This occurs when the query is SELECT COUNT(*) FROM pinotTable ...
Expand Down Expand Up @@ -227,4 +237,52 @@ private static String quoteIdentifier(String identifier)
{
return format("\"%s\"", identifier.replaceAll("\"", "\"\""));
}

public static Optional<String> getQueryOptionsString(String options)
{
if (isNullOrEmpty(options)) {
return Optional.empty();
}

Map<String, String> queryOptionsMap = parseQueryOptions(options);
return getQueryOptions(queryOptionsMap);
}

public static Optional<String> getQueryOptions(Map<String, String> queryOptionsMap)
{
if (queryOptionsMap.isEmpty()) {
return Optional.empty();
}
String options = queryOptionsMap.entrySet().stream()
.map(e -> "SET " + e.getKey() + " = " + e.getValue())
.collect(Collectors.joining(";\n"));
if (!options.endsWith(";")) {
options += ";\n";
}
return Optional.of(options);
}

public static Map<String, String> parseQueryOptions(String options)
{
if (isNullOrEmpty(options)) {
return ImmutableMap.of();
}
try {
// we allow escaping the delimiters like , and : using back-slash.
// To support that we create a negative lookbehind of , and : which
// are not preceded by a back-slash.
String headersDelim = "(?<!\\\\),";
String kvDelim = "(?<!\\\\):";
ImmutableMap.Builder<String, String> queryOptions = ImmutableMap.builder();
for (String kv : options.split(headersDelim)) {
String key = kv.split(kvDelim, 2)[0].trim();
String val = kv.split(kvDelim, 2)[1].trim();
queryOptions.put(key, val);
}
return queryOptions.buildOrThrow();
}
catch (IndexOutOfBoundsException e) {
throw new IllegalArgumentException("Invalid format for 'pinot.query-options'. Value provided is :" + options, e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2800,5 +2800,16 @@ public void testQueryOptions()
" HAVING SUM(long_number) > 10000\""))
.matches("VALUES (VARCHAR 'Los Angeles', DOUBLE '50000.0'), (VARCHAR 'New York', DOUBLE '20000.0')")
.isFullyPushedDown();
Session queryOptions = Session.builder(getQueryRunner().getDefaultSession())
.setCatalogSessionProperty("pinot", "query_options", "skipUpsert:true")
.build();
assertThat(query(queryOptions,"SELECT city, \"sum(long_number)\" FROM" +
" \"SET skipUpsert = true;\n" +
" SELECT city, SUM(long_number)" +
" FROM my_table" +
" GROUP BY city" +
" HAVING SUM(long_number) > 10000\""))
.matches("VALUES (VARCHAR 'Los Angeles', DOUBLE '50000.0'), (VARCHAR 'New York', DOUBLE '20000.0')")
.isFullyPushedDown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -453,16 +453,16 @@ public void testQueryOptions()
String tableName = realtimeOnlyTable.getTableName();
String tableNameWithSuffix = tableName + REALTIME_SUFFIX;
String query = """
SET skipUpsert='true';
SET useMultistageEngine='true';
SET skipUpsert=true;
SET useMultistageEngine=true;
SELECT FlightNum
FROM %s
LIMIT 50;
""".formatted(tableNameWithSuffix);
DynamicTable dynamicTable = buildFromPql(pinotMetadata, new SchemaTableName("default", query), mockClusterInfoFetcher, TESTING_TYPE_CONVERTER);
String expectedPql = """
SET skipUpsert = 'true';
SET useMultistageEngine = 'true';
SET useMultistageEngine = true;
SET skipUpsert = true;
SELECT "FlightNum" \
FROM %s \
LIMIT 50""".formatted(tableNameWithSuffix);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public void testDefaults()
.setAggregationPushdownEnabled(true)
.setCountDistinctPushdownEnabled(true)
.setProxyEnabled(false)
.setQueryOptions(null)
.setTargetSegmentPageSize(DataSize.of(1, MEGABYTE)));
}

Expand All @@ -68,6 +69,7 @@ public void testExplicitPropertyMappings()
.put("pinot.count-distinct-pushdown.enabled", "false")
.put("pinot.proxy.enabled", "true")
.put("pinot.target-segment-page-size", "2MB")
.put("pinot.query-options", "enableNullHandling:true,skipUpsert:false")
.buildOrThrow();

PinotConfig expected = new PinotConfig()
Expand All @@ -84,6 +86,7 @@ public void testExplicitPropertyMappings()
.setAggregationPushdownEnabled(false)
.setCountDistinctPushdownEnabled(false)
.setProxyEnabled(true)
.setQueryOptions("enableNullHandling:true,skipUpsert:false")
.setTargetSegmentPageSize(DataSize.of(2, MEGABYTE));

ConfigAssertions.assertFullMapping(properties, expected);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,18 @@ public void testConnectionTimeoutParsedProperly()
.build();
assertThat(PinotSessionProperties.getConnectionTimeout(session)).isEqualTo(new Duration(0.25, TimeUnit.MINUTES));
}

@Test
public void testQueryOptionsParsing()
{
PinotConfig config = new PinotConfig().setQueryOptions("enableNullHandling:true,skipUpsert:true,varcharOption:'value'");
PinotSessionProperties pinotSessionProperties = new PinotSessionProperties(config);
ConnectorSession session = TestingConnectorSession.builder()
.setPropertyMetadata(pinotSessionProperties.getSessionProperties())
.build();
String queryOptions = PinotSessionProperties.getQueryOptions(session).orElseThrow();
assertThat(queryOptions).isEqualTo("SET enableNullHandling = true;\n" +
"SET skipUpsert = true;\n" +
"SET varcharOption = 'value';\n");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.pinot.query;

import org.junit.jupiter.api.Test;

import java.util.Map;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.entry;

final class TestPinotQueryBuilder
{
@Test
public void testParseQueryOption()
{
String options = "limitForSegmentQueries:1000,limitForBrokerQueries:1000,targetSegmentPageSizeBytes:1000";
Map<String, String> parssedOptions = PinotQueryBuilder.parseQueryOptions(options);
assertThat(parssedOptions).containsExactly(entry("limitForSegmentQueries", "1000"), entry("limitForBrokerQueries", "1000"), entry("targetSegmentPageSizeBytes", "1000"));
}

@Test
public void testParseQueryOptionWithQuotes()
{
String options = "enableNullHandling:true,skipUpsert:true,varcharOption:'value'";
Map<String, String> parssedOptions = PinotQueryBuilder.parseQueryOptions(options);
assertThat(parssedOptions).containsExactly(entry("enableNullHandling", "true"), entry("skipUpsert", "true"), entry("varcharOption", "'value'"));
}

@Test
public void testParseQueryOptionWithEmptyString()
{
String options = "";
Map<String, String> parssedOptions = PinotQueryBuilder.parseQueryOptions(options);
assertThat(parssedOptions).isEmpty();
}
}

0 comments on commit cb25080

Please sign in to comment.