Skip to content

Commit

Permalink
Spooling protocol extension
Browse files Browse the repository at this point in the history
  • Loading branch information
wendigo committed Aug 9, 2024
1 parent ce90afa commit 36c08fd
Show file tree
Hide file tree
Showing 124 changed files with 6,703 additions and 417 deletions.
12 changes: 12 additions & 0 deletions client/trino-cli/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,18 @@
<version>${dep.jline.version}</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jdk8</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>org.jline</groupId>
<artifactId>jline-terminal-ffm</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@
package io.trino.cli;

import com.google.common.collect.ImmutableList;
import io.airlift.json.JsonCodec;
import io.airlift.units.Duration;
import io.trino.client.ClientSession;
import io.trino.client.ClientTypeSignature;
import io.trino.client.Column;
import io.trino.client.JsonCodec;
import io.trino.client.QueryDataClientJacksonModule;
import io.trino.client.QueryResults;
import io.trino.client.StatementStats;
import io.trino.client.uri.PropertyName;
Expand All @@ -42,10 +43,10 @@
import static com.google.common.net.HttpHeaders.CONTENT_TYPE;
import static com.google.common.net.HttpHeaders.LOCATION;
import static com.google.common.net.HttpHeaders.SET_COOKIE;
import static io.airlift.json.JsonCodec.jsonCodec;
import static io.trino.cli.ClientOptions.OutputFormat.CSV;
import static io.trino.cli.TerminalUtils.getTerminal;
import static io.trino.client.ClientStandardTypes.BIGINT;
import static io.trino.client.JsonCodec.jsonCodec;
import static io.trino.client.auth.external.ExternalRedirectStrategy.PRINT;
import static java.util.concurrent.TimeUnit.MINUTES;
import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -54,8 +55,7 @@
@TestInstance(PER_METHOD)
public class TestQueryRunner
{
private static final JsonCodec<QueryResults> QUERY_RESULTS_CODEC = jsonCodec(QueryResults.class);

private static final JsonCodec<QueryResults> QUERY_RESULTS_CODEC = jsonCodec(QueryResults.class, new QueryDataClientJacksonModule());
private MockWebServer server;

@BeforeEach
Expand Down
10 changes: 10 additions & 0 deletions client/trino-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,21 @@
<artifactId>okhttp-urlconnection</artifactId>
</dependency>

<dependency>
<groupId>com.squareup.okio</groupId>
<artifactId>okio-jvm</artifactId>
</dependency>

<dependency>
<groupId>dev.failsafe</groupId>
<artifactId>failsafe</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>aircompressor</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>units</artifactId>
Expand Down
23 changes: 23 additions & 0 deletions client/trino-client/src/main/java/io/trino/client/Column.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.errorprone.annotations.Immutable;

import java.util.Objects;

import static java.util.Objects.requireNonNull;

@Immutable
Expand Down Expand Up @@ -54,4 +56,25 @@ public ClientTypeSignature getTypeSignature()
{
return typeSignature;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Column column = (Column) o;
return Objects.equals(name, column.name)
&& Objects.equals(type, column.type)
&& Objects.equals(typeSignature, column.typeSignature);
}

@Override
public int hashCode()
{
return Objects.hash(name, type, typeSignature);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.client;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import io.trino.client.ClientTypeSignatureParameter.ParameterKind;

Expand Down Expand Up @@ -53,33 +54,35 @@
import static java.util.Collections.unmodifiableList;
import static java.util.Objects.requireNonNull;

final class FixJsonDataUtils
public final class FixJsonDataUtils
{
private FixJsonDataUtils() {}

public static Iterable<List<Object>> fixData(List<Column> columns, List<List<Object>> data)
public static Iterable<List<Object>> fixData(List<Column> columns, Iterable<List<Object>> data)
{
if (data == null) {
return null;
}

ColumnTypeHandler[] typeHandlers = createTypeHandlers(columns);
ImmutableList.Builder<List<Object>> rows = ImmutableList.builderWithExpectedSize(data.size());
for (List<Object> row : data) {
if (row.size() != typeHandlers.length) {
throw new IllegalArgumentException("row/column size mismatch");
}
ArrayList<Object> newRow = new ArrayList<>(typeHandlers.length);
int column = 0;
for (Object value : row) {
if (value != null) {
value = typeHandlers[column].fixValue(value);
}
newRow.add(value);
column++;
return Iterables.transform(data, row -> fixRow(typeHandlers, row));
}

private static List<Object> fixRow(ColumnTypeHandler[] typeHandlers, List<Object> row)
{
if (row.size() != typeHandlers.length) {
throw new IllegalArgumentException("row/column size mismatch");
}
ArrayList<Object> newRow = new ArrayList<>(typeHandlers.length);
int column = 0;
for (Object value : row) {
if (value != null) {
value = typeHandlers[column].fixValue(value);
}
rows.add(unmodifiableList(newRow)); // allow nulls in list
newRow.add(value);
column++;
}
return rows.build();
return unmodifiableList(newRow); // allow nulls in list
}

private static ColumnTypeHandler[] createTypeHandlers(List<Column> columns)
Expand Down
16 changes: 16 additions & 0 deletions client/trino-client/src/main/java/io/trino/client/JsonCodec.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
Expand Down Expand Up @@ -65,6 +66,11 @@ public static <T> JsonCodec<T> jsonCodec(Class<T> type)
return new JsonCodec<>(OBJECT_MAPPER_SUPPLIER.get(), type);
}

public static <T> JsonCodec<T> jsonCodec(Class<T> type, Module... extraModules)
{
return new JsonCodec<>(OBJECT_MAPPER_SUPPLIER.get().registerModules(extraModules), type);
}

private final ObjectMapper mapper;
private final Type type;
private final JavaType javaType;
Expand Down Expand Up @@ -106,4 +112,14 @@ public T fromJson(InputStream inputStream)
return value;
}
}

public String toJson(T instance)
{
try {
return mapper.writerFor(javaType).writeValueAsString(instance);
}
catch (IOException exception) {
throw new IllegalArgumentException(String.format("%s could not be converted to JSON", instance.getClass().getName()), exception);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public final class ProtocolHeaders
private final String requestClientCapabilities;
private final String requestResourceEstimate;
private final String requestExtraCredential;
private final String requestQueryDataEncoding;
private final String responseSetCatalog;
private final String responseSetSchema;
private final String responseSetPath;
Expand Down Expand Up @@ -89,6 +90,7 @@ private ProtocolHeaders(String name)
requestClientCapabilities = prefix + "Client-Capabilities";
requestResourceEstimate = prefix + "Resource-Estimate";
requestExtraCredential = prefix + "Extra-Credential";
requestQueryDataEncoding = prefix + "Query-Data-Encoding";
responseSetCatalog = prefix + "Set-Catalog";
responseSetSchema = prefix + "Set-Schema";
responseSetPath = prefix + "Set-Path";
Expand Down Expand Up @@ -198,6 +200,11 @@ public String requestExtraCredential()
return requestExtraCredential;
}

public String requestQueryDataEncoding()
{
return requestQueryDataEncoding;
}

public String responseSetCatalog()
{
return responseSetCatalog;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@
*/
package io.trino.client;

import jakarta.annotation.Nullable;

import java.util.List;

public interface QueryData
{
@Nullable
Iterable<List<Object>> getData();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.client;

import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.core.Version;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
import com.fasterxml.jackson.databind.module.SimpleModule;
import io.trino.client.spooling.EncodedQueryData;

import java.io.IOException;
import java.util.List;

/**
* Decodes the direct and encoded protocols.
*
* If the "data" fields starts with an array - this is the direct protocol which requires reading values and wrapping them with a class.
*
* Otherwise, this is an encoded protocol.
*/
public class QueryDataClientJacksonModule
extends SimpleModule
{
private static final TypeReference<Iterable<List<Object>>> DIRECT_FORMAT = new TypeReference<Iterable<List<Object>>>(){};
private static final TypeReference<EncodedQueryData> ENCODED_FORMAT = new TypeReference<EncodedQueryData>(){};

public QueryDataClientJacksonModule()
{
super(QueryDataClientJacksonModule.class.getSimpleName(), Version.unknownVersion());
addDeserializer(QueryData.class, new Deserializer());
}

private static class Deserializer
extends StdDeserializer<QueryData>
{
public Deserializer()
{
super(QueryData.class);
}

@Override
public QueryData deserialize(JsonParser jsonParser, DeserializationContext deserializationContext)
throws IOException
{
// If this is not JSON_ARRAY we are dealing with direct data encoding
if (jsonParser.currentToken().equals(JsonToken.START_ARRAY)) {
return RawQueryData.of(jsonParser.readValueAs(DIRECT_FORMAT));
}
return jsonParser.readValueAs(ENCODED_FORMAT);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.client;

import io.trino.client.spooling.DataAttributes;
import jakarta.annotation.Nullable;

import java.io.IOException;
import java.io.InputStream;
import java.util.List;

public interface QueryDataDecoder
{
interface Factory
{
QueryDataDecoder create(List<Column> columns, DataAttributes segmentAttributes);

String encodingId();
}

@Nullable Iterable<List<Object>> decode(@Nullable InputStream input, DataAttributes queryAttributes)
throws IOException;

String encodingId();
}
Loading

0 comments on commit 36c08fd

Please sign in to comment.