From f370b134c6c939956587f9019e471f9e42201ec4 Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Wed, 15 Feb 2023 13:12:11 +0900 Subject: [PATCH] Support query pass through table function in Cassandra --- docs/src/main/sphinx/connector/cassandra.rst | 32 +++++ .../cassandra/CassandraClientModule.java | 4 + .../plugin/cassandra/CassandraConnector.java | 12 ++ .../plugin/cassandra/CassandraMetadata.java | 72 ++++++++-- .../CassandraNamedRelationHandle.java | 127 +++++++++++++++++ .../cassandra/CassandraPartitionManager.java | 2 +- .../CassandraQueryRelationHandle.java | 65 +++++++++ .../cassandra/CassandraRecordSetProvider.java | 6 +- .../cassandra/CassandraRelationHandle.java | 30 ++++ .../plugin/cassandra/CassandraSession.java | 6 +- .../cassandra/CassandraSplitManager.java | 19 ++- .../plugin/cassandra/CassandraTable.java | 6 +- .../cassandra/CassandraTableHandle.java | 85 ++++-------- .../io/trino/plugin/cassandra/ptf/Query.java | 130 ++++++++++++++++++ .../cassandra/util/CassandraCqlUtils.java | 10 +- .../cassandra/TestCassandraConnector.java | 4 +- .../cassandra/TestCassandraConnectorTest.java | 109 +++++++++++++++ .../cassandra/TestCassandraTableHandle.java | 17 ++- .../cassandra/TestJsonCassandraHandles.java | 36 ++--- ...assandraClusteringPredicatesExtractor.java | 4 +- 20 files changed, 661 insertions(+), 115 deletions(-) create mode 100644 plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraNamedRelationHandle.java create mode 100644 plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraQueryRelationHandle.java create mode 100644 plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraRelationHandle.java create mode 100644 plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/ptf/Query.java diff --git a/docs/src/main/sphinx/connector/cassandra.rst b/docs/src/main/sphinx/connector/cassandra.rst index b7c52f86da37..5f16b7546d13 100644 --- a/docs/src/main/sphinx/connector/cassandra.rst +++ b/docs/src/main/sphinx/connector/cassandra.rst @@ -393,6 +393,38 @@ statements, the connector supports the following features: * :doc:`/sql/create-table-as` * :doc:`/sql/drop-table` +Table functions +--------------- + +The connector provides specific :doc:`table functions ` to +access Cassandra. +.. _cassandra-query-function: + +``query(varchar) -> table`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +The ``query`` function allows you to query the underlying Cassandra directly. It +requires syntax native to Cassandra, because the full query is pushed down and +processed by Cassandra. This can be useful for accessing native features which are +not available in Trino or for improving query performance in situations where +running a query natively may be faster. + +.. include:: polymorphic-table-function-ordering.fragment + +As a simple example, to select an entire table:: + + SELECT + * + FROM + TABLE( + example.system.query( + query => 'SELECT + * + FROM + tpch.nation' + ) + ); + DROP TABLE ^^^^^^^^^^ diff --git a/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraClientModule.java b/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraClientModule.java index 20f6ba46d69b..0df0633156e5 100644 --- a/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraClientModule.java +++ b/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraClientModule.java @@ -26,7 +26,9 @@ import com.google.inject.Provides; import com.google.inject.Scopes; import io.airlift.json.JsonCodec; +import io.trino.plugin.cassandra.ptf.Query; import io.trino.spi.TrinoException; +import io.trino.spi.ptf.ConnectorTableFunction; import io.trino.spi.type.Type; import io.trino.spi.type.TypeId; import io.trino.spi.type.TypeManager; @@ -46,6 +48,7 @@ import java.util.Optional; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.inject.multibindings.Multibinder.newSetBinder; import static io.airlift.configuration.ConfigBinder.configBinder; import static io.airlift.json.JsonBinder.jsonBinder; import static io.airlift.json.JsonCodecBinder.jsonCodecBinder; @@ -77,6 +80,7 @@ public void configure(Binder binder) binder.bind(CassandraPageSinkProvider.class).in(Scopes.SINGLETON); binder.bind(CassandraPartitionManager.class).in(Scopes.SINGLETON); binder.bind(CassandraSessionProperties.class).in(Scopes.SINGLETON); + newSetBinder(binder, ConnectorTableFunction.class).addBinding().toProvider(Query.class).in(Scopes.SINGLETON); binder.bind(CassandraTypeManager.class).in(Scopes.SINGLETON); configBinder(binder).bindConfig(CassandraClientConfig.class); diff --git a/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraConnector.java b/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraConnector.java index c7ab92cccde5..ca3d05f02f28 100644 --- a/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraConnector.java +++ b/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraConnector.java @@ -13,6 +13,7 @@ */ package io.trino.plugin.cassandra; +import com.google.common.collect.ImmutableSet; import io.airlift.bootstrap.LifeCycleManager; import io.trino.spi.connector.Connector; import io.trino.spi.connector.ConnectorMetadata; @@ -21,12 +22,14 @@ import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorSplitManager; import io.trino.spi.connector.ConnectorTransactionHandle; +import io.trino.spi.ptf.ConnectorTableFunction; import io.trino.spi.session.PropertyMetadata; import io.trino.spi.transaction.IsolationLevel; import javax.inject.Inject; import java.util.List; +import java.util.Set; import static io.trino.spi.transaction.IsolationLevel.READ_UNCOMMITTED; import static io.trino.spi.transaction.IsolationLevel.checkConnectorSupports; @@ -40,6 +43,7 @@ public class CassandraConnector private final CassandraSplitManager splitManager; private final ConnectorRecordSetProvider recordSetProvider; private final ConnectorPageSinkProvider pageSinkProvider; + private final Set connectorTableFunctions; private final List> sessionProperties; @Inject @@ -49,6 +53,7 @@ public CassandraConnector( CassandraSplitManager splitManager, CassandraRecordSetProvider recordSetProvider, CassandraPageSinkProvider pageSinkProvider, + Set connectorTableFunctions, CassandraSessionProperties sessionProperties) { this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null"); @@ -56,6 +61,7 @@ public CassandraConnector( this.splitManager = requireNonNull(splitManager, "splitManager is null"); this.recordSetProvider = requireNonNull(recordSetProvider, "recordSetProvider is null"); this.pageSinkProvider = requireNonNull(pageSinkProvider, "pageSinkProvider is null"); + this.connectorTableFunctions = ImmutableSet.copyOf(requireNonNull(connectorTableFunctions, "connectorTableFunctions is null")); this.sessionProperties = requireNonNull(sessionProperties.getSessionProperties(), "sessionProperties is null"); } @@ -90,6 +96,12 @@ public ConnectorPageSinkProvider getPageSinkProvider() return pageSinkProvider; } + @Override + public Set getTableFunctions() + { + return connectorTableFunctions; + } + @Override public List> getSessionProperties() { diff --git a/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraMetadata.java b/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraMetadata.java index 749c0c17318c..d411cd4c75b8 100644 --- a/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraMetadata.java +++ b/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraMetadata.java @@ -13,10 +13,14 @@ */ package io.trino.plugin.cassandra; +import com.datastax.oss.driver.api.core.cql.ColumnDefinition; +import com.datastax.oss.driver.api.core.cql.PreparedStatement; +import com.datastax.oss.driver.api.core.cql.SimpleStatement; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import io.airlift.json.JsonCodec; import io.airlift.slice.Slice; +import io.trino.plugin.cassandra.ptf.Query.QueryHandle; import io.trino.plugin.cassandra.util.CassandraCqlUtils; import io.trino.spi.TrinoException; import io.trino.spi.connector.ColumnHandle; @@ -37,8 +41,10 @@ import io.trino.spi.connector.SchemaNotFoundException; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.connector.SchemaTablePrefix; +import io.trino.spi.connector.TableFunctionApplicationResult; import io.trino.spi.connector.TableNotFoundException; import io.trino.spi.predicate.TupleDomain; +import io.trino.spi.ptf.ConnectorTableFunctionHandle; import io.trino.spi.statistics.ComputedStatistics; import io.trino.spi.type.Type; @@ -109,7 +115,7 @@ public CassandraTableHandle getTableHandle(ConnectorSession session, SchemaTable { requireNonNull(tableName, "tableName is null"); try { - return cassandraSession.getTable(tableName).getTableHandle(); + return new CassandraTableHandle(cassandraSession.getTable(tableName).getTableHandle()); } catch (TableNotFoundException | SchemaNotFoundException e) { // table was not found @@ -119,16 +125,32 @@ public CassandraTableHandle getTableHandle(ConnectorSession session, SchemaTable private static SchemaTableName getTableName(ConnectorTableHandle tableHandle) { - return ((CassandraTableHandle) tableHandle).getSchemaTableName(); + return ((CassandraTableHandle) tableHandle).getRequiredNamedRelation().getSchemaTableName(); } @Override public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle tableHandle) { requireNonNull(tableHandle, "tableHandle is null"); + CassandraTableHandle handle = (CassandraTableHandle) tableHandle; + if (handle.getRelationHandle() instanceof CassandraQueryRelationHandle queryRelationHandle) { + List columns = getColumnHandles(queryRelationHandle.getQuery()).stream() + .map(CassandraColumnHandle.class::cast) + .map(CassandraColumnHandle::getColumnMetadata) + .collect(toImmutableList()); + return new ConnectorTableMetadata(getSchemaTableName(handle), columns); + } return getTableMetadata(getTableName(tableHandle)); } + private static SchemaTableName getSchemaTableName(CassandraTableHandle handle) + { + return handle.isNamedRelation() + ? handle.getRequiredNamedRelation().getSchemaTableName() + // TODO (https://github.com/trinodb/trino/issues/6694) SchemaTableName should not be required for synthetic ConnectorTableHandle + : new SchemaTableName("_generated", "_generated_query"); + } + private ConnectorTableMetadata getTableMetadata(SchemaTableName tableName) { CassandraTable table = cassandraSession.getTable(tableName); @@ -206,9 +228,14 @@ public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTable } @Override - public Optional> applyFilter(ConnectorSession session, ConnectorTableHandle tableHandle, Constraint constraint) + public Optional> applyFilter(ConnectorSession session, ConnectorTableHandle connectorTableHandle, Constraint constraint) { - CassandraTableHandle handle = (CassandraTableHandle) tableHandle; + CassandraTableHandle tableHandle = (CassandraTableHandle) connectorTableHandle; + if (tableHandle.isSynthetic()) { + // filter pushdown currently not supported for passthrough query + return Optional.empty(); + } + CassandraNamedRelationHandle handle = tableHandle.getRequiredNamedRelation(); if (handle.getPartitions().isPresent() || !handle.getClusteringKeyPredicates().isEmpty()) { // TODO support repeated applyFilter return Optional.empty(); @@ -240,12 +267,12 @@ public Optional> applyFilter(C } return Optional.of( - new ConstraintApplicationResult<>(new CassandraTableHandle( + new ConstraintApplicationResult<>(new CassandraTableHandle(new CassandraNamedRelationHandle( handle.getSchemaName(), handle.getTableName(), Optional.of(partitionResult.getPartitions()), // TODO this should probably be AND-ed with handle.getClusteringKeyPredicates() - clusteringKeyPredicates), + clusteringKeyPredicates)), unenforcedConstraint, false)); } @@ -265,7 +292,7 @@ public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle throw new TrinoException(PERMISSION_DENIED, "DROP TABLE is disabled in this Cassandra catalog"); } - CassandraTableHandle cassandraTableHandle = (CassandraTableHandle) tableHandle; + CassandraNamedRelationHandle cassandraTableHandle = ((CassandraTableHandle) tableHandle).getRequiredNamedRelation(); if (cassandraSession.isMaterializedView(cassandraTableHandle.getSchemaTableName())) { throw new TrinoException(NOT_SUPPORTED, "Dropping materialized views not yet supported"); } @@ -339,7 +366,7 @@ public Optional finishCreateTable(ConnectorSession sess @Override public void truncateTable(ConnectorSession session, ConnectorTableHandle tableHandle) { - CassandraTableHandle table = (CassandraTableHandle) tableHandle; + CassandraNamedRelationHandle table = ((CassandraTableHandle) tableHandle).getRequiredNamedRelation(); cassandraSession.execute(truncate(validSchemaName(table.getSchemaName()), validTableName(table.getTableName())).build()); } @@ -350,7 +377,7 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto throw new TrinoException(NOT_SUPPORTED, "This connector does not support query retries"); } - CassandraTableHandle table = (CassandraTableHandle) tableHandle; + CassandraNamedRelationHandle table = ((CassandraTableHandle) tableHandle).getRequiredNamedRelation(); if (cassandraSession.isMaterializedView(table.getSchemaTableName())) { throw new TrinoException(NOT_SUPPORTED, "Inserting into materialized views not yet supported"); } @@ -410,7 +437,7 @@ public Optional applyDelete(ConnectorSession session, Conn @Override public OptionalLong executeDelete(ConnectorSession session, ConnectorTableHandle deleteHandle) { - CassandraTableHandle handle = (CassandraTableHandle) deleteHandle; + CassandraNamedRelationHandle handle = ((CassandraTableHandle) deleteHandle).getRequiredNamedRelation(); List partitions = handle.getPartitions() .orElseThrow(() -> new TrinoException(NOT_SUPPORTED, "Deleting without partition key is not supported")); if (partitions.isEmpty()) { @@ -422,4 +449,29 @@ public OptionalLong executeDelete(ConnectorSession session, ConnectorTableHandle } return OptionalLong.empty(); } + + @Override + public Optional> applyTableFunction(ConnectorSession session, ConnectorTableFunctionHandle handle) + { + if (!(handle instanceof QueryHandle queryHandle)) { + return Optional.empty(); + } + + CassandraTableHandle tableHandle = queryHandle.getTableHandle(); + List columnHandles = getColumnHandles(((CassandraQueryRelationHandle) tableHandle.getRelationHandle()).getQuery()); + return Optional.of(new TableFunctionApplicationResult<>(tableHandle, columnHandles)); + } + + public List getColumnHandles(String query) + { + PreparedStatement statement = cassandraSession.prepare(SimpleStatement.newInstance(query)); + int position = 0; + ImmutableList.Builder columnsBuilder = ImmutableList.builderWithExpectedSize(statement.getResultSetDefinitions().size()); + for (ColumnDefinition column : statement.getResultSetDefinitions()) { + CassandraType cassandraType = cassandraTypeManager.toCassandraType(column.getType()) + .orElseThrow(() -> new TrinoException(NOT_SUPPORTED, "Unsupported type: " + column.getType())); + columnsBuilder.add(new CassandraColumnHandle(column.getName().asInternal(), position++, cassandraType, false, false, false, false)); + } + return columnsBuilder.build(); + } } diff --git a/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraNamedRelationHandle.java b/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraNamedRelationHandle.java new file mode 100644 index 000000000000..943621d4572a --- /dev/null +++ b/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraNamedRelationHandle.java @@ -0,0 +1,127 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.cassandra; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; +import io.trino.spi.connector.SchemaTableName; + +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Stream; + +import static java.lang.String.format; +import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.joining; + +public class CassandraNamedRelationHandle + extends CassandraRelationHandle +{ + private final String schemaName; + private final String tableName; + private final Optional> partitions; + private final String clusteringKeyPredicates; + + public CassandraNamedRelationHandle(String schemaName, String tableName) + { + this(schemaName, tableName, Optional.empty(), ""); + } + + @JsonCreator + public CassandraNamedRelationHandle( + @JsonProperty("schemaName") String schemaName, + @JsonProperty("tableName") String tableName, + @JsonProperty("partitions") Optional> partitions, + @JsonProperty("clusteringKeyPredicates") String clusteringKeyPredicates) + { + this.schemaName = requireNonNull(schemaName, "schemaName is null"); + this.tableName = requireNonNull(tableName, "tableName is null"); + this.partitions = partitions.map(ImmutableList::copyOf); + this.clusteringKeyPredicates = requireNonNull(clusteringKeyPredicates, "clusteringKeyPredicates is null"); + } + + @JsonProperty + public String getSchemaName() + { + return schemaName; + } + + @JsonProperty + public String getTableName() + { + return tableName; + } + + @JsonProperty + public Optional> getPartitions() + { + return partitions; + } + + @JsonProperty + public String getClusteringKeyPredicates() + { + return clusteringKeyPredicates; + } + + public SchemaTableName getSchemaTableName() + { + return new SchemaTableName(schemaName, tableName); + } + + @Override + public int hashCode() + { + return Objects.hash(schemaName, tableName, partitions, clusteringKeyPredicates); + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + CassandraNamedRelationHandle other = (CassandraNamedRelationHandle) obj; + return Objects.equals(this.schemaName, other.schemaName) && + Objects.equals(this.tableName, other.tableName) && + Objects.equals(this.partitions, other.partitions) && + Objects.equals(this.clusteringKeyPredicates, other.clusteringKeyPredicates); + } + + @Override + public String toString() + { + String result = format("%s:%s", schemaName, tableName); + if (this.partitions.isPresent()) { + List partitions = this.partitions.get(); + result += format( + " %d partitions %s", + partitions.size(), + Stream.concat( + partitions.subList(0, Math.min(partitions.size(), 3)).stream(), + partitions.size() > 3 ? Stream.of("...") : Stream.of()) + .map(Object::toString) + .collect(joining(", ", "[", "]"))); + } + if (!clusteringKeyPredicates.isEmpty()) { + result += format(" constraint(%s)", clusteringKeyPredicates); + } + return result; + } +} diff --git a/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraPartitionManager.java b/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraPartitionManager.java index 8c9d0a4ddb10..d65e779c60d0 100644 --- a/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraPartitionManager.java +++ b/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraPartitionManager.java @@ -54,7 +54,7 @@ public CassandraPartitionManager(CassandraSession cassandraSession, CassandraTyp this.cassandraTypeManager = requireNonNull(cassandraTypeManager, "cassandraTypeManager is null"); } - public CassandraPartitionResult getPartitions(CassandraTableHandle cassandraTableHandle, TupleDomain tupleDomain) + public CassandraPartitionResult getPartitions(CassandraNamedRelationHandle cassandraTableHandle, TupleDomain tupleDomain) { // TODO support repeated applyFilter checkArgument(cassandraTableHandle.getPartitions().isEmpty(), "getPartitions() currently does not take into account table handle's partitions"); diff --git a/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraQueryRelationHandle.java b/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraQueryRelationHandle.java new file mode 100644 index 000000000000..239dcee16173 --- /dev/null +++ b/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraQueryRelationHandle.java @@ -0,0 +1,65 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.cassandra; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; + +import static java.lang.String.format; +import static java.util.Objects.requireNonNull; + +public class CassandraQueryRelationHandle + extends CassandraRelationHandle +{ + private final String query; + + @JsonCreator + public CassandraQueryRelationHandle(@JsonProperty("query") String query) + { + this.query = requireNonNull(query, "query is null"); + } + + @JsonProperty + public String getQuery() + { + return query; + } + + @Override + public String toString() + { + return format("Query[%s]", query); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + CassandraQueryRelationHandle that = (CassandraQueryRelationHandle) o; + return query.equals(that.query); + } + + @Override + public int hashCode() + { + return Objects.hash(query); + } +} diff --git a/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraRecordSetProvider.java b/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraRecordSetProvider.java index dd1b4fcaf531..a43b396df853 100644 --- a/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraRecordSetProvider.java +++ b/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraRecordSetProvider.java @@ -55,7 +55,11 @@ public RecordSet getRecordSet(ConnectorTransactionHandle transaction, ConnectorS .map(column -> (CassandraColumnHandle) column) .collect(toList()); - String selectCql = CassandraCqlUtils.selectFrom(cassandraTable, cassandraColumns).asCql(); + if (cassandraTable.getRelationHandle() instanceof CassandraQueryRelationHandle queryRelationHandle) { + return new CassandraRecordSet(cassandraSession, cassandraTypeManager, queryRelationHandle.getQuery(), cassandraColumns); + } + + String selectCql = CassandraCqlUtils.selectFrom(cassandraTable.getRequiredNamedRelation(), cassandraColumns).asCql(); StringBuilder sb = new StringBuilder(selectCql); if (sb.charAt(sb.length() - 1) == ';') { sb.setLength(sb.length() - 1); diff --git a/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraRelationHandle.java b/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraRelationHandle.java new file mode 100644 index 000000000000..065d85dffb41 --- /dev/null +++ b/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraRelationHandle.java @@ -0,0 +1,30 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.cassandra; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; + +@JsonTypeInfo( + use = JsonTypeInfo.Id.NAME, + property = "@type") +@JsonSubTypes({ + @JsonSubTypes.Type(value = CassandraNamedRelationHandle.class, name = "named"), + @JsonSubTypes.Type(value = CassandraQueryRelationHandle.class, name = "query"), +}) +public abstract class CassandraRelationHandle +{ + @Override + public abstract String toString(); +} diff --git a/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraSession.java b/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraSession.java index 1517f414abca..74b9f06c1e41 100644 --- a/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraSession.java +++ b/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraSession.java @@ -270,7 +270,7 @@ public CassandraTable getTable(SchemaTableName schemaTableName) .sorted(comparing(CassandraColumnHandle::getOrdinalPosition)) .collect(toList()); - CassandraTableHandle tableHandle = new CassandraTableHandle(tableMeta.getKeyspace().asInternal(), tableMeta.getName().asInternal()); + CassandraNamedRelationHandle tableHandle = new CassandraNamedRelationHandle(tableMeta.getKeyspace().asInternal(), tableMeta.getName().asInternal()); return new CassandraTable(tableHandle, sortedColumnHandles); } @@ -464,7 +464,7 @@ public ResultSet execute(Statement statement) private Iterable queryPartitionKeysWithInClauses(CassandraTable table, List> filterPrefixes) { - CassandraTableHandle tableHandle = table.getTableHandle(); + CassandraNamedRelationHandle tableHandle = table.getTableHandle(); List partitionKeyColumns = table.getPartitionKeyColumns(); Select partitionKeys = selectDistinctFrom(tableHandle, partitionKeyColumns) @@ -476,7 +476,7 @@ private Iterable queryPartitionKeysWithInClauses(CassandraTable table, List private Iterable queryPartitionKeysLegacyWithMultipleQueries(CassandraTable table, List> filterPrefixes) { - CassandraTableHandle tableHandle = table.getTableHandle(); + CassandraNamedRelationHandle tableHandle = table.getTableHandle(); List partitionKeyColumns = table.getPartitionKeyColumns(); Set> filterCombinations = Sets.cartesianProduct(filterPrefixes); diff --git a/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraSplitManager.java b/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraSplitManager.java index 5db9145ec922..1e17a0c8bab9 100644 --- a/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraSplitManager.java +++ b/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraSplitManager.java @@ -78,12 +78,17 @@ public CassandraSplitManager( public ConnectorSplitSource getSplits( ConnectorTransactionHandle transaction, ConnectorSession session, - ConnectorTableHandle tableHandle, + ConnectorTableHandle connectorTableHandle, DynamicFilter dynamicFilter, Constraint constraint) { - CassandraTableHandle cassandraTableHandle = (CassandraTableHandle) tableHandle; + CassandraTableHandle tableHandle = (CassandraTableHandle) connectorTableHandle; + if (tableHandle.isSynthetic()) { + return new FixedSplitSource(ImmutableList.of(new CassandraSplit("", "", ImmutableList.of()))); + } + + CassandraNamedRelationHandle cassandraTableHandle = tableHandle.getRequiredNamedRelation(); List partitions; String clusteringKeyPredicates; if (cassandraTableHandle.getPartitions().isPresent()) { @@ -97,7 +102,7 @@ public ConnectorSplitSource getSplits( } if (partitions.isEmpty()) { - log.debug("No partitions matched predicates for table %s", tableHandle); + log.debug("No partitions matched predicates for table %s", connectorTableHandle); return new FixedSplitSource(ImmutableList.of()); } @@ -107,17 +112,17 @@ public ConnectorSplitSource getSplits( if (cassandraPartition.isUnpartitioned() || cassandraPartition.isIndexedColumnPredicatePushdown()) { CassandraTable table = cassandraSession.getTable(cassandraTableHandle.getSchemaTableName()); List splits = getSplitsByTokenRange(table, cassandraPartition.getPartitionId(), getSplitsPerNode(session)); - log.debug("One partition matched predicates for table %s, creating %s splits by token ranges", tableHandle, splits.size()); + log.debug("One partition matched predicates for table %s, creating %s splits by token ranges", connectorTableHandle, splits.size()); return new FixedSplitSource(splits); } } List splits = getSplitsForPartitions(cassandraTableHandle, partitions, clusteringKeyPredicates); - log.debug("%s partitions matched predicates for table %s, creating %s splits", partitions.size(), tableHandle, splits.size()); + log.debug("%s partitions matched predicates for table %s, creating %s splits", partitions.size(), connectorTableHandle, splits.size()); return new FixedSplitSource(splits); } - private String extractClusteringKeyPredicates(CassandraPartitionResult partitionResult, CassandraTableHandle tableHandle, CassandraSession session) + private String extractClusteringKeyPredicates(CassandraPartitionResult partitionResult, CassandraNamedRelationHandle tableHandle, CassandraSession session) { if (partitionResult.isUnpartitioned()) { return ""; @@ -167,7 +172,7 @@ else if (tokenRange instanceof RandomTokenRange randomTokenRange) { return tokenExpression + " > " + startTokenValue + " AND " + tokenExpression + " <= " + endTokenValue; } - private List getSplitsForPartitions(CassandraTableHandle cassTableHandle, List partitions, String clusteringPredicates) + private List getSplitsForPartitions(CassandraNamedRelationHandle cassTableHandle, List partitions, String clusteringPredicates) { String schema = cassTableHandle.getSchemaName(); HostAddressFactory hostAddressFactory = new HostAddressFactory(); diff --git a/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraTable.java b/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraTable.java index 697ee7d59822..2e6b02ea23e4 100644 --- a/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraTable.java +++ b/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraTable.java @@ -24,10 +24,10 @@ public class CassandraTable { - private final CassandraTableHandle tableHandle; + private final CassandraNamedRelationHandle tableHandle; private final List columns; - public CassandraTable(CassandraTableHandle tableHandle, List columns) + public CassandraTable(CassandraNamedRelationHandle tableHandle, List columns) { this.tableHandle = requireNonNull(tableHandle, "tableHandle is null"); this.columns = ImmutableList.copyOf(requireNonNull(columns, "columns is null")); @@ -38,7 +38,7 @@ public List getColumns() return columns; } - public CassandraTableHandle getTableHandle() + public CassandraNamedRelationHandle getTableHandle() { return tableHandle; } diff --git a/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraTableHandle.java b/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraTableHandle.java index 7617f1f8ef9e..eb27dacf73cb 100644 --- a/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraTableHandle.java +++ b/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraTableHandle.java @@ -14,79 +14,56 @@ package io.trino.plugin.cassandra; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.collect.ImmutableList; import io.trino.spi.connector.ConnectorTableHandle; -import io.trino.spi.connector.SchemaTableName; -import java.util.List; import java.util.Objects; -import java.util.Optional; -import java.util.stream.Stream; -import static java.lang.String.format; +import static com.google.common.base.MoreObjects.toStringHelper; +import static com.google.common.base.Preconditions.checkState; import static java.util.Objects.requireNonNull; -import static java.util.stream.Collectors.joining; public class CassandraTableHandle implements ConnectorTableHandle { - private final String schemaName; - private final String tableName; - private final Optional> partitions; - private final String clusteringKeyPredicates; - - public CassandraTableHandle(String schemaName, String tableName) - { - this(schemaName, tableName, Optional.empty(), ""); - } + private final CassandraRelationHandle relationHandle; @JsonCreator - public CassandraTableHandle( - @JsonProperty("schemaName") String schemaName, - @JsonProperty("tableName") String tableName, - @JsonProperty("partitions") Optional> partitions, - @JsonProperty("clusteringKeyPredicates") String clusteringKeyPredicates) + public CassandraTableHandle(@JsonProperty("relationHandle") CassandraRelationHandle relationHandle) { - this.schemaName = requireNonNull(schemaName, "schemaName is null"); - this.tableName = requireNonNull(tableName, "tableName is null"); - this.partitions = partitions.map(ImmutableList::copyOf); - this.clusteringKeyPredicates = requireNonNull(clusteringKeyPredicates, "clusteringKeyPredicates is null"); + this.relationHandle = requireNonNull(relationHandle, "relationHandle is null"); } @JsonProperty - public String getSchemaName() + public CassandraRelationHandle getRelationHandle() { - return schemaName; + return relationHandle; } - @JsonProperty - public String getTableName() + @JsonIgnore + public CassandraNamedRelationHandle getRequiredNamedRelation() { - return tableName; + checkState(isNamedRelation(), "The table handle does not represent a named relation: %s", this); + return (CassandraNamedRelationHandle) relationHandle; } - @JsonProperty - public Optional> getPartitions() + @JsonIgnore + public boolean isSynthetic() { - return partitions; + return !isNamedRelation(); } - @JsonProperty - public String getClusteringKeyPredicates() + @JsonIgnore + public boolean isNamedRelation() { - return clusteringKeyPredicates; - } - - public SchemaTableName getSchemaTableName() - { - return new SchemaTableName(schemaName, tableName); + return relationHandle instanceof CassandraNamedRelationHandle; } @Override public int hashCode() { - return Objects.hash(schemaName, tableName, partitions, clusteringKeyPredicates); + return Objects.hash(relationHandle); } @Override @@ -99,30 +76,14 @@ public boolean equals(Object obj) return false; } CassandraTableHandle other = (CassandraTableHandle) obj; - return Objects.equals(this.schemaName, other.schemaName) && - Objects.equals(this.tableName, other.tableName) && - Objects.equals(this.partitions, other.partitions) && - Objects.equals(this.clusteringKeyPredicates, other.clusteringKeyPredicates); + return Objects.equals(this.relationHandle, other.relationHandle); } @Override public String toString() { - String result = format("%s:%s", schemaName, tableName); - if (this.partitions.isPresent()) { - List partitions = this.partitions.get(); - result += format( - " %d partitions %s", - partitions.size(), - Stream.concat( - partitions.subList(0, Math.min(partitions.size(), 3)).stream(), - partitions.size() > 3 ? Stream.of("...") : Stream.of()) - .map(Object::toString) - .collect(joining(", ", "[", "]"))); - } - if (!clusteringKeyPredicates.isEmpty()) { - result += format(" constraint(%s)", clusteringKeyPredicates); - } - return result; + return toStringHelper(this) + .add("relationHandle", relationHandle) + .toString(); } } diff --git a/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/ptf/Query.java b/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/ptf/Query.java new file mode 100644 index 000000000000..a675312ac117 --- /dev/null +++ b/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/ptf/Query.java @@ -0,0 +1,130 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.cassandra.ptf; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; +import io.airlift.slice.Slice; +import io.trino.plugin.base.classloader.ClassLoaderSafeConnectorTableFunction; +import io.trino.plugin.cassandra.CassandraColumnHandle; +import io.trino.plugin.cassandra.CassandraMetadata; +import io.trino.plugin.cassandra.CassandraQueryRelationHandle; +import io.trino.plugin.cassandra.CassandraTableHandle; +import io.trino.spi.connector.ColumnHandle; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorTransactionHandle; +import io.trino.spi.ptf.AbstractConnectorTableFunction; +import io.trino.spi.ptf.Argument; +import io.trino.spi.ptf.ConnectorTableFunction; +import io.trino.spi.ptf.ConnectorTableFunctionHandle; +import io.trino.spi.ptf.Descriptor; +import io.trino.spi.ptf.Descriptor.Field; +import io.trino.spi.ptf.ScalarArgument; +import io.trino.spi.ptf.ScalarArgumentSpecification; +import io.trino.spi.ptf.TableFunctionAnalysis; + +import javax.inject.Inject; +import javax.inject.Provider; + +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.Iterables.getOnlyElement; +import static io.trino.spi.ptf.ReturnTypeSpecification.GenericTable.GENERIC_TABLE; +import static io.trino.spi.type.VarcharType.VARCHAR; +import static java.util.Objects.requireNonNull; + +public class Query + implements Provider +{ + public static final String SCHEMA_NAME = "system"; + public static final String NAME = "query"; + + private final CassandraMetadata cassandraMetadata; + + @Inject + public Query(CassandraMetadata cassandraMetadata) + { + this.cassandraMetadata = requireNonNull(cassandraMetadata, "cassandraMetadata is null"); + } + + @Override + public ConnectorTableFunction get() + { + return new ClassLoaderSafeConnectorTableFunction(new QueryFunction(cassandraMetadata), getClass().getClassLoader()); + } + + public static class QueryFunction + extends AbstractConnectorTableFunction + { + private final CassandraMetadata cassandraMetadata; + + public QueryFunction(CassandraMetadata cassandraMetadata) + { + super( + SCHEMA_NAME, + NAME, + ImmutableList.of(ScalarArgumentSpecification.builder() + .name("QUERY") + .type(VARCHAR) + .build()), + GENERIC_TABLE); + this.cassandraMetadata = requireNonNull(cassandraMetadata, "metadata is null"); + } + + @Override + public TableFunctionAnalysis analyze(ConnectorSession session, ConnectorTransactionHandle transaction, Map arguments) + { + ScalarArgument argument = (ScalarArgument) getOnlyElement(arguments.values()); + String query = ((Slice) argument.getValue()).toStringUtf8(); + + CassandraQueryRelationHandle queryRelationHandle = new CassandraQueryRelationHandle(query); + List columnHandles = cassandraMetadata.getColumnHandles(query); + checkState(!columnHandles.isEmpty(), "Handle doesn't have columns info"); + Descriptor returnedType = new Descriptor(columnHandles.stream() + .map(CassandraColumnHandle.class::cast) + .map(column -> new Field(column.getName(), Optional.of(column.getType()))) + .collect(toImmutableList())); + + QueryHandle handle = new QueryHandle(new CassandraTableHandle(queryRelationHandle)); + + return TableFunctionAnalysis.builder() + .returnedType(returnedType) + .handle(handle) + .build(); + } + } + + public static class QueryHandle + implements ConnectorTableFunctionHandle + { + private final CassandraTableHandle tableHandle; + + @JsonCreator + public QueryHandle(@JsonProperty("tableHandle") CassandraTableHandle tableHandle) + { + this.tableHandle = requireNonNull(tableHandle, "tableHandle is null"); + } + + @JsonProperty + public CassandraTableHandle getTableHandle() + { + return tableHandle; + } + } +} diff --git a/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/util/CassandraCqlUtils.java b/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/util/CassandraCqlUtils.java index 175202759cf1..82a878f3ea6f 100644 --- a/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/util/CassandraCqlUtils.java +++ b/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/util/CassandraCqlUtils.java @@ -19,8 +19,8 @@ import com.fasterxml.jackson.core.io.JsonStringEncoder; import com.google.common.collect.ImmutableList; import io.trino.plugin.cassandra.CassandraColumnHandle; +import io.trino.plugin.cassandra.CassandraNamedRelationHandle; import io.trino.plugin.cassandra.CassandraPartition; -import io.trino.plugin.cassandra.CassandraTableHandle; import io.trino.spi.connector.ColumnHandle; import java.util.ArrayList; @@ -108,20 +108,20 @@ public static List selection(List columns) .collect(toImmutableList()); } - public static Select selectFrom(CassandraTableHandle tableHandle, List columns) + public static Select selectFrom(CassandraNamedRelationHandle tableHandle, List columns) { SelectFrom selectFrom = from(tableHandle); return columns.isEmpty() ? selectFrom.all() : selectFrom.columns(selection(columns)); } - public static SelectFrom from(CassandraTableHandle tableHandle) + public static SelectFrom from(CassandraNamedRelationHandle tableHandle) { String schema = validSchemaName(tableHandle.getSchemaName()); String table = validTableName(tableHandle.getTableName()); return QueryBuilder.selectFrom(schema, table); } - public static Select selectDistinctFrom(CassandraTableHandle tableHandle, List columns) + public static Select selectDistinctFrom(CassandraNamedRelationHandle tableHandle, List columns) { SelectFrom selectFrom = from(tableHandle).distinct(); if (columns.isEmpty()) { @@ -146,7 +146,7 @@ private static String deleteFrom(String schemaName, String tableName, CassandraP schemaName, tableName, getWhereCondition(partition.getPartitionId(), clusteringKeyPredicates)); } - public static List getDeleteQueries(CassandraTableHandle handle) + public static List getDeleteQueries(CassandraNamedRelationHandle handle) { ImmutableList.Builder queries = ImmutableList.builder(); for (CassandraPartition partition : handle.getPartitions().orElse(ImmutableList.of())) { diff --git a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraConnector.java b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraConnector.java index 847813e5e8a9..174f58d186d0 100644 --- a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraConnector.java +++ b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraConnector.java @@ -458,8 +458,8 @@ private static ImmutableMap indexColumns(List col private CassandraTableHandle getTableHandle(Optional> partitions, String clusteringKeyPredicates) { - CassandraTableHandle handle = (CassandraTableHandle) getTableHandle(tableForDelete); - return new CassandraTableHandle(handle.getSchemaName(), handle.getTableName(), partitions, clusteringKeyPredicates); + CassandraNamedRelationHandle handle = ((CassandraTableHandle) getTableHandle(tableForDelete)).getRequiredNamedRelation(); + return new CassandraTableHandle(new CassandraNamedRelationHandle(handle.getSchemaName(), handle.getTableName(), partitions, clusteringKeyPredicates)); } private CassandraPartition createPartition(long value1, long value2) diff --git a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraConnectorTest.java b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraConnectorTest.java index ffd5e8f89211..734787e2d343 100644 --- a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraConnectorTest.java +++ b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraConnectorTest.java @@ -18,6 +18,7 @@ import com.google.common.primitives.Ints; import io.airlift.units.Duration; import io.trino.Session; +import io.trino.sql.planner.plan.FilterNode; import io.trino.testing.BaseConnectorTest; import io.trino.testing.Bytes; import io.trino.testing.MaterializedResult; @@ -26,6 +27,7 @@ import io.trino.testing.TestingConnectorBehavior; import io.trino.testing.assertions.Assert; import io.trino.testing.sql.TestTable; +import org.intellij.lang.annotations.Language; import org.testng.SkipException; import org.testng.annotations.AfterClass; import org.testng.annotations.Test; @@ -72,6 +74,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; public class TestCassandraConnectorTest extends BaseConnectorTest @@ -1384,6 +1387,107 @@ public void testRowLevelDelete() .hasStackTraceContaining("Delete without primary key or partition key is not supported"); } + // test polymorphic table function + + @Test + public void testNativeQuerySelectFromNation() + { + assertQuery( + "SELECT * FROM TABLE(cassandra.system.query(query => 'SELECT name FROM tpch.nation WHERE nationkey = 0 ALLOW FILTERING'))", + "VALUES 'ALGERIA'"); + assertQuery( + "SELECT name FROM TABLE(cassandra.system.query(query => 'SELECT * FROM tpch.nation WHERE nationkey = 0 ALLOW FILTERING'))", + "VALUES 'ALGERIA'"); + assertQuery( + "SELECT name FROM TABLE(cassandra.system.query(query => 'SELECT * FROM tpch.nation')) WHERE nationkey = 0", + "VALUES 'ALGERIA'"); + assertThat(query("SELECT * FROM TABLE(cassandra.system.query(query => 'SELECT * FROM tpch.nation')) WHERE nationkey = 0")) + .isNotFullyPushedDown(FilterNode.class); + } + + @Test + public void testNativeQuerySelectFromTestTable() + { + String tableName = "tpch.test_select" + randomNameSuffix(); + onCassandra("CREATE TABLE " + tableName + "(col BIGINT PRIMARY KEY)"); + + onCassandra("INSERT INTO " + tableName + "(col) VALUES (1)"); + assertQuery( + "SELECT * FROM TABLE(cassandra.system.query(query => 'SELECT * FROM " + tableName + "'))", + "VALUES 1"); + + onCassandra("DROP TABLE " + tableName); + } + + @Test + public void testNativeQueryCaseSensitivity() + { + String tableName = "tpch.test_case" + randomNameSuffix(); + onCassandra("CREATE TABLE " + tableName + "(col_case BIGINT PRIMARY KEY, \"COL_CASE\" BIGINT)"); + + onCassandra("INSERT INTO " + tableName + "(col_case, \"COL_CASE\") VALUES (1, 2)"); + assertQuery( + "SELECT * FROM TABLE(cassandra.system.query(query => 'SELECT * FROM " + tableName + "'))", + "VALUES (1, 2)"); + + onCassandra("DROP TABLE " + tableName); + } + + @Test + public void testNativeQueryCreateTableFailure() + { + String tableName = "test_create" + randomNameSuffix(); + assertFalse(getQueryRunner().tableExists(getSession(), tableName)); + assertThatThrownBy(() -> query("SELECT * FROM TABLE(cassandra.system.query(query => 'CREATE TABLE tpch." + tableName + "(col INT PRIMARY KEY)'))")) + .hasMessage("Handle doesn't have columns info"); + assertFalse(getQueryRunner().tableExists(getSession(), tableName)); + } + + @Test + public void testNativeQueryPreparingStatementFailure() + { + String tableName = "test_insert" + randomNameSuffix(); + assertFalse(getQueryRunner().tableExists(getSession(), tableName)); + assertThatThrownBy(() -> query("SELECT * FROM TABLE(cassandra.system.query(query => 'INSERT INTO tpch." + tableName + "(col) VALUES (1)'))")) + .hasMessageContaining("unconfigured table"); + } + + @Test + public void testNativeQueryUnsupportedStatement() + { + String tableName = "test_unsupported_statement" + randomNameSuffix(); + onCassandra("CREATE TABLE tpch." + tableName + "(col INT PRIMARY KEY)"); + onCassandra("INSERT INTO tpch." + tableName + "(col) VALUES (1)"); + + assertThatThrownBy(() -> query("SELECT * FROM TABLE(cassandra.system.query(query => 'INSERT INTO tpch." + tableName + "(col) VALUES (3)'))")) + .hasMessage("Handle doesn't have columns info"); + assertThatThrownBy(() -> query("SELECT * FROM TABLE(cassandra.system.query(query => 'DELETE FROM tpch." + tableName + " WHERE col = 1'))")) + .hasMessage("Handle doesn't have columns info"); + + assertQuery("SELECT * FROM " + tableName, "VALUES 1"); + + onCassandra("DROP TABLE IF EXISTS tpch." + tableName); + } + + @Test + public void testNativeQueryUnsupportedType() + { + String tableName = "test_unsupported_type" + randomNameSuffix(); + onCassandra("CREATE TABLE tpch." + tableName + "(col TIME PRIMARY KEY)"); + + assertThatThrownBy(() -> query("SELECT * FROM TABLE(cassandra.system.query(query => 'SELECT * FROM tpch." + tableName + "'))")) + .hasMessage("Unsupported type: TIME"); + + onCassandra("DROP TABLE IF EXISTS tpch." + tableName); + } + + @Test + public void testNativeQueryIncorrectSyntax() + { + assertThatThrownBy(() -> query("SELECT * FROM TABLE(system.query(query => 'some wrong syntax'))")) + .hasMessageContaining("no viable alternative at input 'some'"); + } + @Override protected OptionalInt maxColumnNameLength() { @@ -1491,4 +1595,9 @@ private TestCassandraTable testTable(String namePrefix, List codec = JsonCodec.jsonCodec(CassandraTableHandle.class); @Test - public void testRoundTrip() + public void testRoundTripNamedRelationHandle() { - CassandraTableHandle expected = new CassandraTableHandle("schema", "table"); + CassandraTableHandle expected = new CassandraTableHandle(new CassandraNamedRelationHandle("schema", "table")); String json = codec.toJson(expected); CassandraTableHandle actual = codec.fromJson(json); - assertEquals(actual.getSchemaTableName(), expected.getSchemaTableName()); + assertEquals(actual.getRequiredNamedRelation().getSchemaTableName(), expected.getRequiredNamedRelation().getSchemaTableName()); + } + + @Test + public void testRoundTripQueryRelationHandle() + { + CassandraTableHandle expected = new CassandraTableHandle(new CassandraQueryRelationHandle("SELECT * FROM tpch.region")); + + String json = codec.toJson(expected); + CassandraTableHandle actual = codec.fromJson(json); + + assertEquals(actual.getRelationHandle(), new CassandraQueryRelationHandle("SELECT * FROM tpch.region")); } } diff --git a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestJsonCassandraHandles.java b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestJsonCassandraHandles.java index 8008f7a86db4..28882db335e8 100644 --- a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestJsonCassandraHandles.java +++ b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestJsonCassandraHandles.java @@ -37,20 +37,24 @@ public class TestJsonCassandraHandles { private static final Map TABLE_HANDLE_AS_MAP = ImmutableMap.of( - "schemaName", "cassandra_schema", - "tableName", "cassandra_table", - "clusteringKeyPredicates", ""); + "relationHandle", ImmutableMap.of( + "@type", "named", + "schemaName", "cassandra_schema", + "tableName", "cassandra_table", + "clusteringKeyPredicates", "")); private static final Map TABLE2_HANDLE_AS_MAP = ImmutableMap.of( - "schemaName", "cassandra_schema", - "tableName", "cassandra_table", - "partitions", List.of( - ImmutableMap.of( - "key", "a2V5", - "partitionId", "partitionKey1 = 11 AND partitionKey2 = 22", - "tupleDomain", ImmutableMap.of("columnDomains", Collections.emptyList()), - "indexedColumnPredicatePushdown", true)), - "clusteringKeyPredicates", "clusteringKey1 = 33"); + "relationHandle", ImmutableMap.of( + "@type", "named", + "schemaName", "cassandra_schema", + "tableName", "cassandra_table", + "partitions", List.of( + ImmutableMap.of( + "key", "a2V5", + "partitionId", "partitionKey1 = 11 AND partitionKey2 = 22", + "tupleDomain", ImmutableMap.of("columnDomains", Collections.emptyList()), + "indexedColumnPredicatePushdown", true)), + "clusteringKeyPredicates", "clusteringKey1 = 33")); private static final Map COLUMN_HANDLE_AS_MAP = ImmutableMap.builder() .put("name", "column") @@ -97,7 +101,7 @@ public class TestJsonCassandraHandles public void testTableHandleSerialize() throws Exception { - CassandraTableHandle tableHandle = new CassandraTableHandle("cassandra_schema", "cassandra_table"); + CassandraTableHandle tableHandle = new CassandraTableHandle(new CassandraNamedRelationHandle("cassandra_schema", "cassandra_table")); assertTrue(OBJECT_MAPPER.canSerialize(CassandraTableHandle.class)); String json = OBJECT_MAPPER.writeValueAsString(tableHandle); @@ -108,7 +112,7 @@ public void testTableHandleSerialize() public void testTable2HandleSerialize() throws Exception { - CassandraTableHandle tableHandle = new CassandraTableHandle("cassandra_schema", "cassandra_table", PARTITIONS, "clusteringKey1 = 33"); + CassandraTableHandle tableHandle = new CassandraTableHandle(new CassandraNamedRelationHandle("cassandra_schema", "cassandra_table", PARTITIONS, "clusteringKey1 = 33")); assertTrue(OBJECT_MAPPER.canSerialize(CassandraTableHandle.class)); String json = OBJECT_MAPPER.writeValueAsString(tableHandle); testJsonEquals(json, TABLE2_HANDLE_AS_MAP); @@ -120,7 +124,7 @@ public void testTableHandleDeserialize() { String json = OBJECT_MAPPER.writeValueAsString(TABLE_HANDLE_AS_MAP); - CassandraTableHandle tableHandle = OBJECT_MAPPER.readValue(json, CassandraTableHandle.class); + CassandraNamedRelationHandle tableHandle = (OBJECT_MAPPER.readValue(json, CassandraTableHandle.class)).getRequiredNamedRelation(); assertEquals(tableHandle.getSchemaName(), "cassandra_schema"); assertEquals(tableHandle.getTableName(), "cassandra_table"); @@ -134,7 +138,7 @@ public void testTable2HandleDeserialize() { String json = OBJECT_MAPPER.writeValueAsString(TABLE2_HANDLE_AS_MAP); - CassandraTableHandle tableHandle = OBJECT_MAPPER.readValue(json, CassandraTableHandle.class); + CassandraNamedRelationHandle tableHandle = (OBJECT_MAPPER.readValue(json, CassandraTableHandle.class)).getRequiredNamedRelation(); assertEquals(tableHandle.getSchemaName(), "cassandra_schema"); assertEquals(tableHandle.getTableName(), "cassandra_table"); diff --git a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/util/TestCassandraClusteringPredicatesExtractor.java b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/util/TestCassandraClusteringPredicatesExtractor.java index c7d2514813fa..3570cfb64ecc 100644 --- a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/util/TestCassandraClusteringPredicatesExtractor.java +++ b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/util/TestCassandraClusteringPredicatesExtractor.java @@ -18,8 +18,8 @@ import com.google.common.collect.ImmutableMap; import io.trino.plugin.cassandra.CassandraClusteringPredicatesExtractor; import io.trino.plugin.cassandra.CassandraColumnHandle; +import io.trino.plugin.cassandra.CassandraNamedRelationHandle; import io.trino.plugin.cassandra.CassandraTable; -import io.trino.plugin.cassandra.CassandraTableHandle; import io.trino.plugin.cassandra.CassandraTypes; import io.trino.spi.connector.ColumnHandle; import io.trino.spi.predicate.Domain; @@ -49,7 +49,7 @@ public void setUp() col4 = new CassandraColumnHandle("clusteringKe3", 4, CassandraTypes.BIGINT, false, true, false, false); cassandraTable = new CassandraTable( - new CassandraTableHandle("test", "records"), ImmutableList.of(col1, col2, col3, col4)); + new CassandraNamedRelationHandle("test", "records"), ImmutableList.of(col1, col2, col3, col4)); cassandraVersion = Version.parse("2.1.5"); }