Skip to content

Commit

Permalink
feat: Add Flight SQL server support (#6023)
Browse files Browse the repository at this point in the history
This adds [Arrow Flight
SQL](https://arrow.apache.org/docs/format/FlightSql.html) support to the
Deephaven server, with the goal of supporting various SQL drivers built
on top of Flight SQL (JDBC, ADBC, ODBC). It is limited to query
statements (ie, no UPDATEs). The implementation supports ad-hoc query
statements and ad-hoc looking prepared query statements, but not
parameterized prepared statements. Queries are able to reference tables
from the global scope by name; we might be able to expand support to
other resolvers in the future, potentially with catalogs and namespaces.
The scope of supported queries is guided by our Sql engine
`io.deephaven.engine.sql.Sql` which is based on a Calcite query parser.

It is important to note that while the Flight SQL implementation
respects
`io.deephaven.server.session.TicketResolver.Authorization.transform` it
is **not** hooked up to `io.deephaven.auth.ServiceAuthWiring` nor
`io.deephaven.server.table.validation.ColumnExpressionValidator`. So
while Flight SQL does not allow users script execution, it does not
limit the table operations a user may perform, nor does it restrict the
calling of arbitrary functions from a formula. As such, the security
posture of Flight SQL sits somewhere between "full access" and "limited
access".

A cookie-based authentication extension has been added to support some
Flight SQL clients which don't operate via the normal Flight
authentication "Authorization" header (yes, it's a misnomer), and
instead expect the server to send "Set-Cookie" to the clients, and for
the clients to echo back the cookie(s) via the "Cookie" header. The
server will only send the authentication token as a cookie when the
client sends the header and value
"x-deephaven-auth-cookie-request=true". To support this, the
`io.grpc.Context` has been captured and preserved during the export
submission logic.

The full Flight SQL action and command spectrum has been skeleton-ed out
with appropriate "unimplemented" messages in anticipation that we will
want to expand the scope of the Flight SQL implementation in the future.
It also serves as a more explicit guide to readers of the code for what
is and is not supported.

Fixes #5339

---------

Co-authored-by: jianfengmao <[email protected]>
  • Loading branch information
devinrsmith and jmao-denver authored Nov 21, 2024
1 parent 8df9655 commit 064b7bf
Show file tree
Hide file tree
Showing 61 changed files with 5,855 additions and 178 deletions.
32 changes: 17 additions & 15 deletions engine/sql/src/main/java/io/deephaven/engine/sql/Sql.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,20 @@
import io.deephaven.qst.table.TableHeader.Builder;
import io.deephaven.qst.table.TableSpec;
import io.deephaven.qst.table.TicketTable;
import io.deephaven.qst.type.Type;
import io.deephaven.sql.Scope;
import io.deephaven.sql.ScopeStaticImpl;
import io.deephaven.sql.SqlAdapter;
import io.deephaven.sql.TableInformation;
import io.deephaven.util.annotations.InternalUseOnly;
import io.deephaven.util.annotations.ScriptApi;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.function.Function;

/**
* Experimental SQL execution. Subject to change.
Expand All @@ -47,35 +50,38 @@ public static TableSpec dryRun(String sql) {
return dryRun(sql, currentScriptSessionNamedTables());
}

@InternalUseOnly
public static TableSpec parseSql(String sql, Map<String, Table> scope, Function<String, TicketTable> ticketFunction,
Map<TicketTable, Table> out) {
return SqlAdapter.parseSql(sql, scope(scope, out, ticketFunction));
}

private static Table evaluate(String sql, Map<String, Table> scope) {
final Map<TicketTable, Table> map = new HashMap<>(scope.size());
final TableSpec tableSpec = parseSql(sql, scope, map);
final TableSpec tableSpec = parseSql(sql, scope, Sql::sqlref, map);
log.debug().append("Executing. Graphviz representation:").nl().append(ToGraphvizDot.INSTANCE, tableSpec).endl();
return tableSpec.logic().create(new TableCreatorTicketInterceptor(TableCreatorImpl.INSTANCE, map));
}

private static TableSpec dryRun(String sql, Map<String, Table> scope) {
final TableSpec tableSpec = parseSql(sql, scope, null);
final TableSpec tableSpec = parseSql(sql, scope, Sql::sqlref, null);
log.info().append("Dry run. Graphviz representation:").nl().append(ToGraphvizDot.INSTANCE, tableSpec).endl();
return tableSpec;
}

private static TableSpec parseSql(String sql, Map<String, Table> scope, Map<TicketTable, Table> out) {
return SqlAdapter.parseSql(sql, scope(scope, out));
}

private static TicketTable sqlref(String tableName) {
// The TicketTable can technically be anything unique (incrementing number, random, ...), but for
// visualization purposes it makes sense to use the (already unique) table name.
return TicketTable.of(("sqlref/" + tableName).getBytes(StandardCharsets.UTF_8));
}

private static Scope scope(Map<String, Table> scope, Map<TicketTable, Table> out) {
private static Scope scope(Map<String, Table> scope, Map<TicketTable, Table> out,
Function<String, TicketTable> ticketFunction) {
final ScopeStaticImpl.Builder builder = ScopeStaticImpl.builder();
for (Entry<String, Table> e : scope.entrySet()) {
final String tableName = e.getKey();
final Table table = e.getValue();
// The TicketTable can technically be anything unique (incrementing number, random, ...), but for
// visualization purposes it makes sense to use the (already unique) table name.
final TicketTable spec = sqlref(tableName);
final TicketTable spec = ticketFunction.apply(tableName);
final List<String> qualifiedName = List.of(tableName);
final TableHeader header = adapt(table.getDefinition());
builder.addTables(TableInformation.of(qualifiedName, header, spec));
Expand Down Expand Up @@ -103,11 +109,7 @@ private static TableHeader adapt(TableDefinition tableDef) {
}

private static ColumnHeader<?> adapt(ColumnDefinition<?> columnDef) {
if (columnDef.getComponentType() == null) {
return ColumnHeader.of(columnDef.getName(), columnDef.getDataType());
}
// SQLTODO(array-type)
throw new UnsupportedOperationException("SQLTODO(array-type)");
return ColumnHeader.of(columnDef.getName(), Type.find(columnDef.getDataType()));
}

private enum ToGraphvizDot implements ObjFormatter<TableSpec> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,17 @@

import io.deephaven.engine.table.Table;
import io.deephaven.qst.TableCreator;
import io.deephaven.qst.table.EmptyTable;
import io.deephaven.qst.table.InputTable;
import io.deephaven.qst.table.MultiJoinInput;
import io.deephaven.qst.table.NewTable;
import io.deephaven.qst.TableCreatorDelegate;
import io.deephaven.qst.table.TicketTable;
import io.deephaven.qst.table.TimeTable;

import java.util.List;
import java.util.Map;
import java.util.Objects;

class TableCreatorTicketInterceptor implements TableCreator<Table> {
private final TableCreator<Table> delegate;
class TableCreatorTicketInterceptor extends TableCreatorDelegate<Table> {
private final Map<TicketTable, Table> map;

public TableCreatorTicketInterceptor(TableCreator<Table> delegate, Map<TicketTable, Table> map) {
this.delegate = Objects.requireNonNull(delegate);
super(delegate);
this.map = Objects.requireNonNull(map);
}

Expand All @@ -31,36 +25,6 @@ public Table of(TicketTable ticketTable) {
if (table != null) {
return table;
}
return delegate.of(ticketTable);
}

@Override
public Table of(NewTable newTable) {
return delegate.of(newTable);
}

@Override
public Table of(EmptyTable emptyTable) {
return delegate.of(emptyTable);
}

@Override
public Table of(TimeTable timeTable) {
return delegate.of(timeTable);
}

@Override
public Table of(InputTable inputTable) {
return delegate.of(inputTable);
}

@Override
public Table multiJoin(List<MultiJoinInput<Table>> multiJoinInputs) {
return delegate.multiJoin(multiJoinInputs);
}

@Override
public Table merge(Iterable<Table> tables) {
return delegate.merge(tables);
return super.of(ticketTable);
}
}
31 changes: 17 additions & 14 deletions engine/table/src/main/java/io/deephaven/engine/util/TableTools.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.deephaven.util.annotations.ScriptApi;
import io.deephaven.util.type.ArrayTypeUtils;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.io.*;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -67,13 +68,13 @@ private static <T> BinaryOperator<T> throwingMerger() {
};
}

private static <T, K, U> Collector<T, ?, Map<K, U>> toLinkedMap(
private static <T, K, U> Collector<T, ?, LinkedHashMap<K, U>> toLinkedMap(
Function<? super T, ? extends K> keyMapper,
Function<? super T, ? extends U> valueMapper) {
return Collectors.toMap(keyMapper, valueMapper, throwingMerger(), LinkedHashMap::new);
}

private static final Collector<ColumnHolder<?>, ?, Map<String, ColumnSource<?>>> COLUMN_HOLDER_LINKEDMAP_COLLECTOR =
private static final Collector<ColumnHolder<?>, ?, LinkedHashMap<String, ColumnSource<?>>> COLUMN_HOLDER_LINKEDMAP_COLLECTOR =
toLinkedMap(ColumnHolder::getName, ColumnHolder::getColumnSource);

/////////// Utilities To Display Tables /////////////////
Expand Down Expand Up @@ -752,22 +753,24 @@ public static Table newTable(ColumnHolder<?>... columnHolders) {
checkSizes(columnHolders);
WritableRowSet rowSet = getRowSet(columnHolders);
Map<String, ColumnSource<?>> columns = Arrays.stream(columnHolders).collect(COLUMN_HOLDER_LINKEDMAP_COLLECTOR);
return new QueryTable(rowSet.toTracking(), columns) {
{
setFlat();
}
};
QueryTable queryTable = new QueryTable(rowSet.toTracking(), columns);
queryTable.setFlat();
return queryTable;
}

public static Table newTable(TableDefinition definition, ColumnHolder<?>... columnHolders) {
return newTable(definition, null, columnHolders);
}

public static Table newTable(TableDefinition definition, @Nullable Map<String, Object> attributes,
ColumnHolder<?>... columnHolders) {
checkSizes(columnHolders);
WritableRowSet rowSet = getRowSet(columnHolders);
Map<String, ColumnSource<?>> columns = Arrays.stream(columnHolders).collect(COLUMN_HOLDER_LINKEDMAP_COLLECTOR);
return new QueryTable(definition, rowSet.toTracking(), columns) {
{
setFlat();
}
};
final WritableRowSet rowSet = getRowSet(columnHolders);
final LinkedHashMap<String, ColumnSource<?>> columns =
Arrays.stream(columnHolders).collect(COLUMN_HOLDER_LINKEDMAP_COLLECTOR);
final QueryTable queryTable = new QueryTable(definition, rowSet.toTracking(), columns, null, attributes);
queryTable.setFlat();
return queryTable;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ private Liveness() {}
/**
* <p>
* Determine whether a cached object should be reused, w.r.t. liveness. Null inputs are never safe for reuse. If the
* object is a {@link LivenessReferent} and not a non-refreshing {@link DynamicNode}, this method will return the
* result of trying to manage object with the top of the current thread's {@link LivenessScopeStack}.
* object is a {@link LivenessReferent} and is a refreshing {@link DynamicNode}, this method will return the result
* of trying to manage object with the top of the current thread's {@link LivenessScopeStack}.
*
* @param object The object
* @return True if the object did not need management, or if it was successfully managed, false otherwise
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@
import com.google.protobuf.ByteStringAccess;
import com.google.rpc.Code;
import io.deephaven.UncheckedDeephavenException;
import io.deephaven.api.util.NameValidator;
import io.deephaven.barrage.flatbuf.BarrageMessageWrapper;
import io.deephaven.base.ArrayUtil;
import io.deephaven.base.ClassUtil;
import io.deephaven.base.verify.Assert;
import io.deephaven.chunk.ChunkType;
import io.deephaven.configuration.Configuration;
import io.deephaven.engine.rowset.RowSequence;
import io.deephaven.engine.rowset.RowSet;
Expand All @@ -27,6 +29,8 @@
import io.deephaven.engine.table.impl.sources.ReinterpretUtils;
import io.deephaven.engine.table.impl.util.BarrageMessage;
import io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph;
import io.deephaven.engine.util.ColumnFormatting;
import io.deephaven.engine.util.input.InputTableUpdater;
import io.deephaven.extensions.barrage.BarragePerformanceLog;
import io.deephaven.extensions.barrage.BarrageSnapshotOptions;
import io.deephaven.extensions.barrage.BarrageStreamGenerator;
Expand All @@ -35,15 +39,10 @@
import io.deephaven.extensions.barrage.chunk.vector.VectorExpansionKernel;
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.logger.Logger;
import io.deephaven.proto.backplane.grpc.ExportedTableCreationResponse;
import io.deephaven.proto.flight.util.MessageHelper;
import io.deephaven.proto.flight.util.SchemaHelper;
import io.deephaven.proto.util.Exceptions;
import io.deephaven.api.util.NameValidator;
import io.deephaven.engine.util.ColumnFormatting;
import io.deephaven.engine.util.input.InputTableUpdater;
import io.deephaven.chunk.ChunkType;
import io.deephaven.proto.backplane.grpc.ExportedTableCreationResponse;
import io.deephaven.qst.column.Column;
import io.deephaven.util.type.TypeUtils;
import io.deephaven.vector.Vector;
import io.grpc.stub.StreamObserver;
Expand All @@ -69,8 +68,21 @@
import java.time.LocalDate;
import java.time.LocalTime;
import java.time.ZonedDateTime;
import java.util.*;
import java.util.function.*;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.IntFunction;
import java.util.function.ToIntFunction;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -207,6 +219,14 @@ public static ByteString schemaBytesFromTableDefinition(
fbb, DEFAULT_SNAPSHOT_DESER_OPTIONS, tableDefinition, attributes, isFlat));
}

public static Schema schemaFromTable(@NotNull final Table table) {
return makeSchema(DEFAULT_SNAPSHOT_DESER_OPTIONS, table.getDefinition(), table.getAttributes(), table.isFlat());
}

public static Schema toSchema(final TableDefinition definition, Map<String, Object> attributes, boolean isFlat) {
return makeSchema(DEFAULT_SNAPSHOT_DESER_OPTIONS, definition, attributes, isFlat);
}

public static ByteString schemaBytes(@NotNull final ToIntFunction<FlatBufferBuilder> schemaPayloadWriter) {

// note that flight expects the Schema to be wrapped in a Message prefixed by a 4-byte identifier
Expand All @@ -226,17 +246,23 @@ public static int makeTableSchemaPayload(
@NotNull final TableDefinition tableDefinition,
@NotNull final Map<String, Object> attributes,
final boolean isFlat) {
final Map<String, String> schemaMetadata = attributesToMetadata(attributes, isFlat);
return makeSchema(options, tableDefinition, attributes, isFlat).getSchema(builder);
}

public static Schema makeSchema(
@NotNull final StreamReaderOptions options,
@NotNull final TableDefinition tableDefinition,
@NotNull final Map<String, Object> attributes,
final boolean isFlat) {
final Map<String, String> schemaMetadata = attributesToMetadata(attributes, isFlat);
final Map<String, String> descriptions = GridAttributes.getColumnDescriptions(attributes);
final InputTableUpdater inputTableUpdater = (InputTableUpdater) attributes.get(Table.INPUT_TABLE_ATTRIBUTE);
final List<Field> fields = columnDefinitionsToFields(
descriptions, inputTableUpdater, tableDefinition, tableDefinition.getColumns(),
ignored -> new HashMap<>(),
attributes, options.columnsAsList())
.collect(Collectors.toList());

return new Schema(fields, schemaMetadata).getSchema(builder);
return new Schema(fields, schemaMetadata);
}

@NotNull
Expand Down
24 changes: 24 additions & 0 deletions extensions/flight-sql/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Flight SQL

See [FlightSqlResolver](src/main/java/io/deephaven/server/flightsql/FlightSqlResolver.java) for documentation on
Deephaven's Flight SQL service.

## Client

The Flight SQL client is simple constructed based on the underlying Flight client.

```java
FlightClient flightClient = ...;
FlightSqlClient flightSqlClient = new FlightSqlClient(flightClient);
```

## JDBC

The default Flight SQL JDBC driver uses cookie authorization; by default, this is not enabled on the Deephaven server.
To enable this, the request header "x-deephaven-auth-cookie-request" must be set to "true".

Example JDBC connection string to self-signed TLS:

```
jdbc:arrow-flight-sql://localhost:8443/?Authorization=Anonymous&useEncryption=1&disableCertificateVerification=1&x-deephaven-auth-cookie-request=true
```
Loading

0 comments on commit 064b7bf

Please sign in to comment.