Skip to content

Commit

Permalink
Support query pass through table function in Cassandra
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Feb 15, 2023
1 parent 6c10805 commit f370b13
Show file tree
Hide file tree
Showing 20 changed files with 661 additions and 115 deletions.
32 changes: 32 additions & 0 deletions docs/src/main/sphinx/connector/cassandra.rst
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,38 @@ statements, the connector supports the following features:
* :doc:`/sql/create-table-as`
* :doc:`/sql/drop-table`

Table functions
---------------

The connector provides specific :doc:`table functions </functions/table>` to
access Cassandra.
.. _cassandra-query-function:

``query(varchar) -> table``
^^^^^^^^^^^^^^^^^^^^^^^^^^^

The ``query`` function allows you to query the underlying Cassandra directly. It
requires syntax native to Cassandra, because the full query is pushed down and
processed by Cassandra. This can be useful for accessing native features which are
not available in Trino or for improving query performance in situations where
running a query natively may be faster.

.. include:: polymorphic-table-function-ordering.fragment

As a simple example, to select an entire table::

SELECT
*
FROM
TABLE(
example.system.query(
query => 'SELECT
*
FROM
tpch.nation'
)
);

DROP TABLE
^^^^^^^^^^

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
import com.google.inject.Provides;
import com.google.inject.Scopes;
import io.airlift.json.JsonCodec;
import io.trino.plugin.cassandra.ptf.Query;
import io.trino.spi.TrinoException;
import io.trino.spi.ptf.ConnectorTableFunction;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeId;
import io.trino.spi.type.TypeManager;
Expand All @@ -46,6 +48,7 @@
import java.util.Optional;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.inject.multibindings.Multibinder.newSetBinder;
import static io.airlift.configuration.ConfigBinder.configBinder;
import static io.airlift.json.JsonBinder.jsonBinder;
import static io.airlift.json.JsonCodecBinder.jsonCodecBinder;
Expand Down Expand Up @@ -77,6 +80,7 @@ public void configure(Binder binder)
binder.bind(CassandraPageSinkProvider.class).in(Scopes.SINGLETON);
binder.bind(CassandraPartitionManager.class).in(Scopes.SINGLETON);
binder.bind(CassandraSessionProperties.class).in(Scopes.SINGLETON);
newSetBinder(binder, ConnectorTableFunction.class).addBinding().toProvider(Query.class).in(Scopes.SINGLETON);
binder.bind(CassandraTypeManager.class).in(Scopes.SINGLETON);

configBinder(binder).bindConfig(CassandraClientConfig.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.plugin.cassandra;

import com.google.common.collect.ImmutableSet;
import io.airlift.bootstrap.LifeCycleManager;
import io.trino.spi.connector.Connector;
import io.trino.spi.connector.ConnectorMetadata;
Expand All @@ -21,12 +22,14 @@
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorSplitManager;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.ptf.ConnectorTableFunction;
import io.trino.spi.session.PropertyMetadata;
import io.trino.spi.transaction.IsolationLevel;

import javax.inject.Inject;

import java.util.List;
import java.util.Set;

import static io.trino.spi.transaction.IsolationLevel.READ_UNCOMMITTED;
import static io.trino.spi.transaction.IsolationLevel.checkConnectorSupports;
Expand All @@ -40,6 +43,7 @@ public class CassandraConnector
private final CassandraSplitManager splitManager;
private final ConnectorRecordSetProvider recordSetProvider;
private final ConnectorPageSinkProvider pageSinkProvider;
private final Set<ConnectorTableFunction> connectorTableFunctions;
private final List<PropertyMetadata<?>> sessionProperties;

@Inject
Expand All @@ -49,13 +53,15 @@ public CassandraConnector(
CassandraSplitManager splitManager,
CassandraRecordSetProvider recordSetProvider,
CassandraPageSinkProvider pageSinkProvider,
Set<ConnectorTableFunction> connectorTableFunctions,
CassandraSessionProperties sessionProperties)
{
this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null");
this.metadata = requireNonNull(metadata, "metadata is null");
this.splitManager = requireNonNull(splitManager, "splitManager is null");
this.recordSetProvider = requireNonNull(recordSetProvider, "recordSetProvider is null");
this.pageSinkProvider = requireNonNull(pageSinkProvider, "pageSinkProvider is null");
this.connectorTableFunctions = ImmutableSet.copyOf(requireNonNull(connectorTableFunctions, "connectorTableFunctions is null"));
this.sessionProperties = requireNonNull(sessionProperties.getSessionProperties(), "sessionProperties is null");
}

Expand Down Expand Up @@ -90,6 +96,12 @@ public ConnectorPageSinkProvider getPageSinkProvider()
return pageSinkProvider;
}

@Override
public Set<ConnectorTableFunction> getTableFunctions()
{
return connectorTableFunctions;
}

@Override
public List<PropertyMetadata<?>> getSessionProperties()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,14 @@
*/
package io.trino.plugin.cassandra;

import com.datastax.oss.driver.api.core.cql.ColumnDefinition;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airlift.json.JsonCodec;
import io.airlift.slice.Slice;
import io.trino.plugin.cassandra.ptf.Query.QueryHandle;
import io.trino.plugin.cassandra.util.CassandraCqlUtils;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ColumnHandle;
Expand All @@ -37,8 +41,10 @@
import io.trino.spi.connector.SchemaNotFoundException;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.SchemaTablePrefix;
import io.trino.spi.connector.TableFunctionApplicationResult;
import io.trino.spi.connector.TableNotFoundException;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.ptf.ConnectorTableFunctionHandle;
import io.trino.spi.statistics.ComputedStatistics;
import io.trino.spi.type.Type;

Expand Down Expand Up @@ -109,7 +115,7 @@ public CassandraTableHandle getTableHandle(ConnectorSession session, SchemaTable
{
requireNonNull(tableName, "tableName is null");
try {
return cassandraSession.getTable(tableName).getTableHandle();
return new CassandraTableHandle(cassandraSession.getTable(tableName).getTableHandle());
}
catch (TableNotFoundException | SchemaNotFoundException e) {
// table was not found
Expand All @@ -119,16 +125,32 @@ public CassandraTableHandle getTableHandle(ConnectorSession session, SchemaTable

private static SchemaTableName getTableName(ConnectorTableHandle tableHandle)
{
return ((CassandraTableHandle) tableHandle).getSchemaTableName();
return ((CassandraTableHandle) tableHandle).getRequiredNamedRelation().getSchemaTableName();
}

@Override
public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle tableHandle)
{
requireNonNull(tableHandle, "tableHandle is null");
CassandraTableHandle handle = (CassandraTableHandle) tableHandle;
if (handle.getRelationHandle() instanceof CassandraQueryRelationHandle queryRelationHandle) {
List<ColumnMetadata> columns = getColumnHandles(queryRelationHandle.getQuery()).stream()
.map(CassandraColumnHandle.class::cast)
.map(CassandraColumnHandle::getColumnMetadata)
.collect(toImmutableList());
return new ConnectorTableMetadata(getSchemaTableName(handle), columns);
}
return getTableMetadata(getTableName(tableHandle));
}

private static SchemaTableName getSchemaTableName(CassandraTableHandle handle)
{
return handle.isNamedRelation()
? handle.getRequiredNamedRelation().getSchemaTableName()
// TODO (https://github.com/trinodb/trino/issues/6694) SchemaTableName should not be required for synthetic ConnectorTableHandle
: new SchemaTableName("_generated", "_generated_query");
}

private ConnectorTableMetadata getTableMetadata(SchemaTableName tableName)
{
CassandraTable table = cassandraSession.getTable(tableName);
Expand Down Expand Up @@ -206,9 +228,14 @@ public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTable
}

@Override
public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(ConnectorSession session, ConnectorTableHandle tableHandle, Constraint constraint)
public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(ConnectorSession session, ConnectorTableHandle connectorTableHandle, Constraint constraint)
{
CassandraTableHandle handle = (CassandraTableHandle) tableHandle;
CassandraTableHandle tableHandle = (CassandraTableHandle) connectorTableHandle;
if (tableHandle.isSynthetic()) {
// filter pushdown currently not supported for passthrough query
return Optional.empty();
}
CassandraNamedRelationHandle handle = tableHandle.getRequiredNamedRelation();
if (handle.getPartitions().isPresent() || !handle.getClusteringKeyPredicates().isEmpty()) {
// TODO support repeated applyFilter
return Optional.empty();
Expand Down Expand Up @@ -240,12 +267,12 @@ public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(C
}

return Optional.of(
new ConstraintApplicationResult<>(new CassandraTableHandle(
new ConstraintApplicationResult<>(new CassandraTableHandle(new CassandraNamedRelationHandle(
handle.getSchemaName(),
handle.getTableName(),
Optional.of(partitionResult.getPartitions()),
// TODO this should probably be AND-ed with handle.getClusteringKeyPredicates()
clusteringKeyPredicates),
clusteringKeyPredicates)),
unenforcedConstraint,
false));
}
Expand All @@ -265,7 +292,7 @@ public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle
throw new TrinoException(PERMISSION_DENIED, "DROP TABLE is disabled in this Cassandra catalog");
}

CassandraTableHandle cassandraTableHandle = (CassandraTableHandle) tableHandle;
CassandraNamedRelationHandle cassandraTableHandle = ((CassandraTableHandle) tableHandle).getRequiredNamedRelation();
if (cassandraSession.isMaterializedView(cassandraTableHandle.getSchemaTableName())) {
throw new TrinoException(NOT_SUPPORTED, "Dropping materialized views not yet supported");
}
Expand Down Expand Up @@ -339,7 +366,7 @@ public Optional<ConnectorOutputMetadata> finishCreateTable(ConnectorSession sess
@Override
public void truncateTable(ConnectorSession session, ConnectorTableHandle tableHandle)
{
CassandraTableHandle table = (CassandraTableHandle) tableHandle;
CassandraNamedRelationHandle table = ((CassandraTableHandle) tableHandle).getRequiredNamedRelation();
cassandraSession.execute(truncate(validSchemaName(table.getSchemaName()), validTableName(table.getTableName())).build());
}

Expand All @@ -350,7 +377,7 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto
throw new TrinoException(NOT_SUPPORTED, "This connector does not support query retries");
}

CassandraTableHandle table = (CassandraTableHandle) tableHandle;
CassandraNamedRelationHandle table = ((CassandraTableHandle) tableHandle).getRequiredNamedRelation();
if (cassandraSession.isMaterializedView(table.getSchemaTableName())) {
throw new TrinoException(NOT_SUPPORTED, "Inserting into materialized views not yet supported");
}
Expand Down Expand Up @@ -410,7 +437,7 @@ public Optional<ConnectorTableHandle> applyDelete(ConnectorSession session, Conn
@Override
public OptionalLong executeDelete(ConnectorSession session, ConnectorTableHandle deleteHandle)
{
CassandraTableHandle handle = (CassandraTableHandle) deleteHandle;
CassandraNamedRelationHandle handle = ((CassandraTableHandle) deleteHandle).getRequiredNamedRelation();
List<CassandraPartition> partitions = handle.getPartitions()
.orElseThrow(() -> new TrinoException(NOT_SUPPORTED, "Deleting without partition key is not supported"));
if (partitions.isEmpty()) {
Expand All @@ -422,4 +449,29 @@ public OptionalLong executeDelete(ConnectorSession session, ConnectorTableHandle
}
return OptionalLong.empty();
}

@Override
public Optional<TableFunctionApplicationResult<ConnectorTableHandle>> applyTableFunction(ConnectorSession session, ConnectorTableFunctionHandle handle)
{
if (!(handle instanceof QueryHandle queryHandle)) {
return Optional.empty();
}

CassandraTableHandle tableHandle = queryHandle.getTableHandle();
List<ColumnHandle> columnHandles = getColumnHandles(((CassandraQueryRelationHandle) tableHandle.getRelationHandle()).getQuery());
return Optional.of(new TableFunctionApplicationResult<>(tableHandle, columnHandles));
}

public List<ColumnHandle> getColumnHandles(String query)
{
PreparedStatement statement = cassandraSession.prepare(SimpleStatement.newInstance(query));
int position = 0;
ImmutableList.Builder<ColumnHandle> columnsBuilder = ImmutableList.builderWithExpectedSize(statement.getResultSetDefinitions().size());
for (ColumnDefinition column : statement.getResultSetDefinitions()) {
CassandraType cassandraType = cassandraTypeManager.toCassandraType(column.getType())
.orElseThrow(() -> new TrinoException(NOT_SUPPORTED, "Unsupported type: " + column.getType()));
columnsBuilder.add(new CassandraColumnHandle(column.getName().asInternal(), position++, cassandraType, false, false, false, false));
}
return columnsBuilder.build();
}
}
Loading

0 comments on commit f370b13

Please sign in to comment.