Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/29 implement querytable api with in memory persistence #89

Merged
115 changes: 102 additions & 13 deletions docs/protocol/delta-sharing-protocol-api.yml
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ paths:
- in: query
name: startingTimestamp
required: false
description: 'Starting Timestamp'
description: 'Starting Timestamp in ISO8601 format, in the UTC timezone'
schema:
type: string
responses:
Expand Down Expand Up @@ -353,7 +353,7 @@ paths:
- in: query
name: startingTimestamp
required: false
description: 'Starting Timestamp'
description: 'Starting Timestamp ISO8601 format, in the UTC timezone'
schema:
type: string
- in: header
Expand Down Expand Up @@ -416,7 +416,7 @@ paths:
- in: query
name: startingTimestamp
required: false
description: 'Starting Timestamp'
description: 'Starting Timestamp ISO8601 format, in the UTC timezone'
schema:
type: string
requestBody:
Expand Down Expand Up @@ -494,7 +494,7 @@ paths:
- in: query
name: startingTimestamp
required: false
description: 'The starting timestamp of the query, a string in the Timestamp Format, which will be converted to a version created greater or equal to this timestamp. '
description: 'The starting timestamp of the query, a string in the Timestamp Format, which will be converted to a version created greater or equal to this timestamp. ISO8601 format, in the UTC timezone'
schema:
type: string
- in: query
Expand All @@ -512,7 +512,7 @@ paths:
- in: query
name: endingTimestamp
required: false
description: 'The starting timestamp of the query, a string in the Timestamp Format, which will be converted to a version created greater or equal to this timestamp. '
description: 'The starting timestamp of the query, a string in the Timestamp Format, which will be converted to a version created greater or equal to this timestamp. ISO8601 format, in the UTC timezone'
schema:
type: string
- in: query
Expand Down Expand Up @@ -702,14 +702,15 @@ components:
can send limit=1000 to the server
version:
type: integer
format: int64
description: |
an optional version number. If set, will return files as of the
specified version of the table. This is only supported on tables
with history sharing enabled.
example: 1005
timestamp:
type: string
example: yyyy-[m]m-[d]d hh:mm:ss[.f...]
example: 2022-01-01T00:00:00Z
description: |
an optional timestamp string in the Timestamp Format,. If set, will
return files as of the table version corresponding to the specified
Expand Down Expand Up @@ -755,34 +756,122 @@ components:
message:
type: string

ProtocolResponse:
# This is not used for the spec but comes handy for autogeneration
TableMetadataResponseObject:
type: object
properties:
protocol:
# it refers to ./delta-sharing-protocol.md#protocol
$ref: '#/components/schemas/ProtocolObject'
metadata:
# it refers to ./delta-sharing-protocol.md#metadata
$ref: '#/components/schemas/MetadataObject'

# This is not used for the spec but comes handy for autogeneration
TableQueryResponseObject:
type: object
properties:
protocol:
# it refers to ./delta-sharing-protocol.md#protocol
$ref: '#/components/schemas/ProtocolObject'
metadata:
# it refers to ./delta-sharing-protocol.md#metadata
$ref: '#/components/schemas/MetadataObject'
files:
type: array
items:
# it refers to ./delta-sharing-protocol.md#file
$ref: '#/components/schemas/FileObject'
FileObject:
type: object
properties:
file:
type: object
properties:
url:
type: string
id:
type: string
partitionValues:
type: object
additionalProperties:
type:
string
size:
type: integer
format: int64
stats:
type: string
version:
type: integer
format: int64
timestamp:
type: integer
format: int64
expirationTimestamp:
type: integer
format: int64
required:
- url
- id
- partitionValues
- size
ProtocolObject:
type: object
properties:
protocol:
type: object
properties:
minReaderVersion:
type: number
type: integer
format: int32
FormatObject:
type: object
properties:
provider:
type: string
required:
- provider

MetadataResponse:
MetadataObject:
type: object
properties:
metadata:
type: object
properties:
id:
type: string
name:
type: string
description:
type: string
format:
type: object
properties:
provider:
type: string
$ref: '#/components/schemas/FormatObject'
schemaString:
type: string
partitionColumns:
type: array
items:
type: string
configuration:
type: object
additionalProperties:
type:
string
version:
type: integer
format: int64
size:
type: integer
format: int64
numFiles:
type: integer
format: int64
required:
- id
- format
- schemaString
- partitionColumns

responses:
"400":
Expand Down
2 changes: 1 addition & 1 deletion server/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ tasks.jacocoTestCoverageVerification {
violationRules {
rule {
limit {
minimum = BigDecimal.valueOf(0.81)
minimum = BigDecimal.valueOf(0.79)
}
}
}
Expand Down
106 changes: 84 additions & 22 deletions server/src/main/java/io/whitefox/api/deltasharing/Mappers.java
Original file line number Diff line number Diff line change
@@ -1,22 +1,20 @@
package io.whitefox.api.deltasharing;

import io.whitefox.api.deltasharing.model.DeltaTableMetadata;
import io.whitefox.api.deltasharing.model.v1.generated.MetadataResponse;
import io.whitefox.api.deltasharing.model.v1.generated.MetadataResponseMetadata;
import io.whitefox.api.deltasharing.model.v1.generated.MetadataResponseMetadataFormat;
import io.whitefox.api.deltasharing.model.v1.generated.ProtocolResponse;
import io.whitefox.api.deltasharing.model.v1.generated.ProtocolResponseProtocol;
import io.whitefox.api.deltasharing.server.TableResponseMetadata;
import io.whitefox.api.deltasharing.model.v1.generated.*;
import io.whitefox.core.*;
import io.whitefox.core.Schema;
import io.whitefox.core.Share;
import io.whitefox.core.Table;
import io.whitefox.core.storage.CreateStorage;
import io.whitefox.core.storage.Storage;
import io.whitefox.core.storage.StorageType;
import java.math.BigDecimal;
import java.time.OffsetDateTime;
import java.util.*;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.jboss.resteasy.reactive.common.NotImplementedYet;

public class Mappers {
public static io.whitefox.api.deltasharing.model.v1.generated.Share share2api(Share p) {
Expand Down Expand Up @@ -173,23 +171,28 @@ public static MetastoreType api2MetastoreType(
}
}

public static TableResponseMetadata toTableResponseMetadata(
DeltaTableMetadata deltaTableMetadata) {
return new TableResponseMetadata(
new ProtocolResponse()
.protocol(new ProtocolResponseProtocol().minReaderVersion(new BigDecimal(1))),
new MetadataResponse()
.metadata(new MetadataResponseMetadata()
.id(deltaTableMetadata.getMetadata().id())
.format(new MetadataResponseMetadataFormat()
.provider(deltaTableMetadata.getMetadata().format().provider()))
.schemaString(
deltaTableMetadata.getMetadata().tableSchema().structType().toJson())
.partitionColumns(deltaTableMetadata.getMetadata().partitionColumns())));
public static TableMetadataResponseObject toTableResponseMetadata(Metadata m) {
return new TableMetadataResponseObject()
.protocol(new ProtocolObject().protocol(new ProtocolObjectProtocol().minReaderVersion(1)))
.metadata(metadata2Api(m));
}

private static MetadataObject metadata2Api(Metadata metadata) {
return new MetadataObject()
.metadata(new MetadataObjectMetadata()
.id(metadata.id())
.name(metadata.name().orElse(null))
.description(metadata.description().orElse(null))
.format(new FormatObject().provider(metadata.format().provider()))
.schemaString(metadata.tableSchema().structType().toJson())
.partitionColumns(metadata.partitionColumns())
._configuration(metadata.configuration())
.version(metadata.version().orElse(null))
.numFiles(metadata.numFiles().orElse(null)));
}

/**
* NOTE: this is ann undocumented feature of the reference impl of delta-sharing, it's not part of the
* NOTE: this is an undocumented feature of the reference impl of delta-sharing, it's not part of the
* protocol
* ----
* Return the [[io.whitefox.api.server.DeltaHeaders.DELTA_SHARE_CAPABILITIES_HEADER]] header
Expand All @@ -207,7 +210,66 @@ public static Map<String, String> toHeaderCapabilitiesMap(String headerCapabilit
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

public static ReadTableRequest api2ReadTableRequest(QueryRequest request) {
if (request.getEndingVersion() != null || request.getStartingVersion() != null)
throw new NotImplementedYet();
if (request.getVersion() != null && request.getTimestamp() == null) {
return new ReadTableRequest.ReadTableVersion(
request.getPredicateHints(),
Optional.ofNullable(request.getLimitHint()),
request.getVersion());
} else if (request.getVersion() == null && request.getTimestamp() != null) {
return new ReadTableRequest.ReadTableAsOfTimestamp(
request.getPredicateHints(),
Optional.ofNullable(request.getLimitHint()),
parse(request.getTimestamp()));
} else if (request.getVersion() == null && request.getTimestamp() == null) {
return new ReadTableRequest.ReadTableCurrentVersion(
request.getPredicateHints(), Optional.ofNullable(request.getLimitHint()));
} else {
throw new IllegalArgumentException("Cannot specify both version and timestamp");
}
}

public static TableQueryResponseObject readTableResult2api(ReadTableResult readTableResult) {
return new TableQueryResponseObject()
.metadata(metadata2Api(readTableResult.metadata()))
.protocol(protocol2Api(readTableResult.protocol()))
.files(
readTableResult.files().stream().map(Mappers::file2Api).collect(Collectors.toList()));
}

private static FileObject file2Api(TableFile f) {
return new FileObject()
._file(new FileObjectFile()
.id(f.id())
.url(f.url())
.partitionValues(f.partitionValues())
.size(f.size())
.stats(f.stats().orElse(null))
.version(f.version().orElse(null))
.timestamp(f.timestamp().orElse(null))
.expirationTimestamp(f.expirationTimestamp()));
}

private static ProtocolObject protocol2Api(Protocol protocol) {
return new ProtocolObject()
.protocol(new ProtocolObjectProtocol()
.minReaderVersion(protocol.minReaderVersion().orElse(1)));
}

public static TableReferenceAndReadRequest api2TableReferenceAndReadRequest(
QueryRequest request, String share, String schema, String table) {
return new TableReferenceAndReadRequest(share, schema, table, api2ReadTableRequest(request));
}

public static <A, B> List<B> mapList(List<A> list, Function<A, B> f) {
return list.stream().map(f).collect(Collectors.toList());
}

private static long parse(String ts) {
return OffsetDateTime.parse(ts, java.time.format.DateTimeFormatter.ISO_OFFSET_DATE_TIME)
.toInstant()
.toEpochMilli();
}
}

This file was deleted.

Loading
Loading