Skip to content

Commit

Permalink
Upgrade Pinot Libraries to 1.1.0
Browse files Browse the repository at this point in the history
Co-authored-by: Elon Azoulay <[email protected]>
  • Loading branch information
wendigo and elonazoulay committed May 24, 2024
1 parent 937456b commit 5251eb1
Show file tree
Hide file tree
Showing 13 changed files with 123 additions and 167 deletions.
2 changes: 1 addition & 1 deletion docs/src/main/sphinx/connector/pinot.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ The Pinot connector allows Trino to query data stored in

To connect to Pinot, you need:

- Pinot 0.11.0 or higher.
- Pinot 1.1.0 or higher.
- Network access from the Trino coordinator and workers to the Pinot controller
nodes. Port 8098 is the default port.

Expand Down
128 changes: 55 additions & 73 deletions plugin/trino-pinot/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,18 @@

<properties>
<air.main.basedir>${project.parent.basedir}</air.main.basedir>
<dep.pinot.version>0.12.1</dep.pinot.version>
<dep.pinot.version>1.1.0</dep.pinot.version>
<!-- additional JVM flags required by chronicle-hft which is used by pinot-segment spi -->
<air.test.jvm.additional-arguments>${air.test.jvm.additional-arguments.default}
--add-opens=java.base/java.lang.reflect=ALL-UNNAMED</air.test.jvm.additional-arguments>
</properties>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
<version>1.32.0</version>
<groupId>net.openhft</groupId>
<artifactId>posix</artifactId>
<version>2.25ea0</version>
</dependency>
</dependencies>
</dependencyManagement>
Expand Down Expand Up @@ -118,10 +121,15 @@
<artifactId>jakarta.validation-api</artifactId>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>

<dependency>
<groupId>org.apache.helix</groupId>
<artifactId>helix-core</artifactId>
<version>1.0.4</version>
<version>1.3.1</version>
<exclusions>
<exclusion>
<groupId>commons-io</groupId>
Expand All @@ -135,6 +143,10 @@
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
Expand All @@ -148,10 +160,6 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
</exclusion>
</exclusions>
</dependency>

Expand All @@ -164,14 +172,6 @@
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.code.findbugs</groupId>
<artifactId>annotations</artifactId>
Expand All @@ -192,14 +192,6 @@
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
<groupId>jakarta.annotation</groupId>
<artifactId>jakarta.annotation-api</artifactId>
</exclusion>
<exclusion>
<groupId>jakarta.ws.rs</groupId>
<artifactId>jakarta.ws.rs-api</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
Expand All @@ -216,6 +208,10 @@
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>net.openhft</groupId>
<artifactId>chronicle-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.antlr</groupId>
<artifactId>antlr4-annotations</artifactId>
Expand All @@ -224,10 +220,6 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
Expand All @@ -244,10 +236,18 @@
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-1.2-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j2-impl</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
Expand Down Expand Up @@ -280,10 +280,6 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
</exclusion>
</exclusions>
</dependency>

Expand All @@ -292,26 +288,10 @@
<artifactId>pinot-core</artifactId>
<version>${dep.pinot.version}</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
<groupId>jakarta.annotation</groupId>
<artifactId>jakarta.annotation-api</artifactId>
</exclusion>
<exclusion>
<groupId>jakarta.ws.rs</groupId>
<artifactId>jakarta.ws.rs-api</artifactId>
</exclusion>
<exclusion>
<groupId>javax.validation</groupId>
<artifactId>validation-api</artifactId>
Expand All @@ -329,16 +309,16 @@
<artifactId>kafka_2.10</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-analyzers-common</artifactId>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-core</artifactId>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j2-impl</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-sandbox</artifactId>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
Expand All @@ -364,6 +344,10 @@
<groupId>org.glassfish.jersey.containers</groupId>
<artifactId>jersey-container-grizzly2-http</artifactId>
</exclusion>
<exclusion>
<groupId>org.locationtech.jts</groupId>
<artifactId>jts-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
Expand All @@ -379,22 +363,18 @@
<groupId>org.apache.pinot</groupId>
<artifactId>pinot-segment-local</artifactId>
<version>${dep.pinot.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-analyzers-common</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-core</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.pinot</groupId>
<artifactId>pinot-segment-spi</artifactId>
<version>${dep.pinot.version}</version>
<exclusions>
<exclusion>
<groupId>org.locationtech.jts</groupId>
<artifactId>jts-core</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
Expand All @@ -415,8 +395,8 @@
<artifactId>log4j-slf4j-impl</artifactId>
</exclusion>
<exclusion>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j2-impl</artifactId>
</exclusion>
</exclusions>
</dependency>
Expand Down Expand Up @@ -468,21 +448,23 @@
<scope>runtime</scope>
</dependency>

<!-- see https://chronicle.software/chronicle-support-java-17/ for note java options -->
<dependency>
<groupId>org.antlr</groupId>
<artifactId>antlr4-runtime</artifactId>
<groupId>net.openhft</groupId>
<artifactId>chronicle-core</artifactId>
<version>2.25ea15</version>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<groupId>org.antlr</groupId>
<artifactId>antlr4-runtime</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<scope>runtime</scope>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.trino.plugin.pinot.PinotSessionProperties;
import io.trino.plugin.pinot.PinotSplit;
import io.trino.spi.connector.ConnectorSession;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.request.BrokerRequest;
Expand All @@ -41,7 +42,6 @@

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
Expand Down Expand Up @@ -218,12 +218,12 @@ public Iterator<PinotDataTableWithSize> queryPinot(ConnectorSession session, Str
throw new PinotException(PINOT_INVALID_PQL_GENERATED, Optional.of(query), format("Parsing error with on %s, Error = %s", serverHost, e.getMessage()), e);
}
ServerInstance serverInstance = pinotHostMapper.getServerInstance(serverHost);
Map<ServerInstance, List<String>> routingTable = new HashMap<>();
routingTable.put(serverInstance, new ArrayList<>(segments));
Map<ServerInstance, Pair<List<String>, List<String>>> routingTable = new HashMap<>();
routingTable.put(serverInstance, Pair.of(segments, segments));
String tableName = brokerRequest.getQuerySource().getTableName();
String rawTableName = TableNameBuilder.extractRawTableName(tableName);
Map<ServerInstance, List<String>> offlineRoutingTable = TableNameBuilder.isOfflineTableResource(tableName) ? routingTable : null;
Map<ServerInstance, List<String>> realtimeRoutingTable = TableNameBuilder.isRealtimeTableResource(tableName) ? routingTable : null;
Map<ServerInstance, Pair<List<String>, List<String>>> offlineRoutingTable = TableNameBuilder.isOfflineTableResource(tableName) ? routingTable : null;
Map<ServerInstance, Pair<List<String>, List<String>>> realtimeRoutingTable = TableNameBuilder.isRealtimeTableResource(tableName) ? routingTable : null;
BrokerRequest offlineBrokerRequest = TableNameBuilder.isOfflineTableResource(tableName) ? brokerRequest : null;
BrokerRequest realtimeBrokerRequest = TableNameBuilder.isRealtimeTableResource(tableName) ? brokerRequest : null;
AsyncQueryResponse asyncQueryResponse =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@
import static java.lang.String.format;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;
import static org.apache.pinot.common.function.TransformFunctionType.DATETIMECONVERT;
import static org.apache.pinot.common.function.TransformFunctionType.DATETRUNC;
import static org.apache.pinot.common.function.TransformFunctionType.TIMECONVERT;
import static org.apache.pinot.common.function.TransformFunctionType.DATE_TIME_CONVERT;
import static org.apache.pinot.common.function.TransformFunctionType.DATE_TRUNC;
import static org.apache.pinot.common.function.TransformFunctionType.TIME_CONVERT;
import static org.apache.pinot.common.request.Literal.stringValue;
import static org.apache.pinot.common.request.context.ExpressionContext.Type.FUNCTION;
import static org.apache.pinot.common.request.context.ExpressionContext.Type.IDENTIFIER;
Expand All @@ -80,9 +80,9 @@ private PinotExpressionRewriter() {}

static {
Map<TransformFunctionType, RewriteRule<FunctionContext>> functionMap = new HashMap<>();
functionMap.put(DATETIMECONVERT, new DateTimeConvertRewriteRule());
functionMap.put(TIMECONVERT, new TimeConvertRewriteRule());
functionMap.put(DATETRUNC, new DateTruncRewriteRule());
functionMap.put(DATE_TIME_CONVERT, new DateTimeConvertRewriteRule());
functionMap.put(TIME_CONVERT, new TimeConvertRewriteRule());
functionMap.put(DATE_TRUNC, new DateTruncRewriteRule());
FUNCTION_RULE_MAP = immutableEnumMap(functionMap);

Map<AggregationFunctionType, RewriteRule<FunctionContext>> aggregationFunctionMap = new HashMap<>();
Expand Down Expand Up @@ -160,7 +160,7 @@ private static class DateTimeConvertRewriteRule
@Override
public Pattern<FunctionContext> getPattern()
{
return transformFunction().with(transformFunctionType().equalTo(DATETIMECONVERT));
return transformFunction().with(transformFunctionType().equalTo(DATE_TIME_CONVERT));
}

@Override
Expand Down Expand Up @@ -193,7 +193,7 @@ private static class TimeConvertRewriteRule
@Override
public Pattern<FunctionContext> getPattern()
{
return transformFunction().with(transformFunctionType().equalTo(TIMECONVERT));
return transformFunction().with(transformFunctionType().equalTo(TIME_CONVERT));
}

@Override
Expand Down Expand Up @@ -224,7 +224,7 @@ private static class DateTruncRewriteRule
@Override
public Pattern<FunctionContext> getPattern()
{
return transformFunction().with(transformFunctionType().equalTo(DATETRUNC));
return transformFunction().with(transformFunctionType().equalTo(DATE_TRUNC));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -624,18 +624,13 @@ public String formatToSql(FunctionContext object, Captures captures, Context con
.map(expressionContext -> formatExpression(expressionContext, context))
.collect(toImmutableList());
checkState(arguments.size() >= 2, "Unexpected expression '%s'", object);
int whenStatements = arguments.size() / 2;
StringBuilder builder = new StringBuilder("CASE ");
builder.append("WHEN ")
.append(arguments.get(0))
.append(" THEN ")
.append(arguments.get(whenStatements));

for (int index = 1; index < whenStatements; index++) {
StringBuilder builder = new StringBuilder("CASE");

for (int index = 0; index < arguments.size() / 2; index++) {
builder.append(" WHEN ")
.append(arguments.get(index))
.append(arguments.get(index * 2))
.append(" THEN ")
.append(arguments.get(index + whenStatements));
.append(arguments.get(index * 2 + 1));
}

if (arguments.size() % 2 != 0) {
Expand Down
Loading

0 comments on commit 5251eb1

Please sign in to comment.